From dacd703b6730fd547512c55cdf5eec10947ede86 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 27 Jan 2026 15:09:24 +0100 Subject: [PATCH 1/2] iceberg_partition_timezone setting --- src/Core/Settings.cpp | 9 +++++++ src/Core/SettingsChangesHistory.cpp | 1 + .../DataLakes/Iceberg/IcebergWrites.cpp | 13 +++++++++- .../DataLakes/Iceberg/IcebergWrites.h | 1 + .../DataLakes/Iceberg/ManifestFile.cpp | 9 ++++++- .../Iceberg/ManifestFilesPruning.cpp | 11 +++++++-- .../DataLakes/Iceberg/ManifestFilesPruning.h | 2 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 24 ++++++++++++------- .../ObjectStorage/DataLakes/Iceberg/Utils.h | 3 ++- 9 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 9977550b19de..66d2b3a68ce1 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6923,6 +6923,15 @@ This is not a hard limit, and it highly depends on the output format granularity )", 0) \ DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"( Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions. +)", 0) \ + DECLARE(Timezone, iceberg_partition_timezone, "", R"( +Time zone by which partitioning of Iceberg tables was performed. +Possible values: + +- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu` +- `` (empty value) - use server or session timezone + +Default value is empty. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3d57025ddd76..7e65c73c5aab 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -64,6 +64,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, {"cluster_table_function_split_granularity", "file", "file", "New setting."}, {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, + {"iceberg_partition_timezone", "", "", "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 75fe44315faa..01ad28b2e593 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -78,6 +79,7 @@ namespace Setting extern const SettingsUInt64 output_format_compression_level; extern const SettingsUInt64 output_format_compression_zstd_window_log; extern const SettingsBool write_full_path_in_iceberg_metadata; +extern const SettingsTimezone iceberg_partition_timezone; } namespace DataLakeStorageSetting @@ -966,7 +968,7 @@ ChunkPartitioner::ChunkPartitioner( auto & factory = FunctionFactory::instance(); - auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name); + auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]); if (!transform_and_argument) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown transform {}", transform_name); @@ -980,6 +982,7 @@ ChunkPartitioner::ChunkPartitioner( result_data_types.push_back(function->getReturnType(columns_for_function)); functions.push_back(function); function_params.push_back(transform_and_argument->argument); + function_time_zones.push_back(transform_and_argument->time_zone); columns_to_apply.push_back(column_name); } } @@ -1016,6 +1019,14 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk) arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "#")); } arguments.push_back(name_to_column[columns_to_apply[transform_ind]]); + if (function_time_zones[transform_ind].has_value()) + { + auto type = std::make_shared(); + auto column_value = ColumnString::create(); + column_value->insert(*function_time_zones[transform_ind]); + auto const_column = ColumnConst::create(std::move(column_value), chunk.getNumRows()); + arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone")); + } auto result = functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared(), chunk.getNumRows(), false); for (size_t i = 0; i < chunk.getNumRows(); ++i) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h index f8b206162ae0..6d5658ab027a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h @@ -195,6 +195,7 @@ class ChunkPartitioner std::vector functions; std::vector> function_params; + std::vector> function_time_zones; std::vector columns_to_apply; std::vector result_data_types; }; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp index d5c50a6c9994..f67508380940 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -12,6 +13,7 @@ #include #include +#include #include #include #include @@ -32,6 +34,11 @@ namespace DB::ErrorCodes extern const int BAD_ARGUMENTS; } +namespace DB::Setting +{ + extern const SettingsTimezone iceberg_partition_timezone; +} + namespace DB::Iceberg { @@ -217,7 +224,7 @@ ManifestFileContent::ManifestFileContent( auto transform_name = partition_specification_field->getValue(f_partition_transform); auto partition_name = partition_specification_field->getValue(f_partition_name); common_partition_specification.emplace_back(source_id, transform_name, partition_name); - auto partition_ast = getASTFromTransform(transform_name, numeric_column_name); + auto partition_ast = getASTFromTransform(transform_name, numeric_column_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]); /// Unsupported partition key expression if (partition_ast == nullptr) continue; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp index 51c68dc7a18c..d14b6c0a12b9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp @@ -26,9 +26,9 @@ using namespace DB; namespace DB::Iceberg { -DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name) +DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone) { - auto transform_and_argument = parseTransformAndArgument(transform_name_src); + auto transform_and_argument = parseTransformAndArgument(transform_name_src, time_zone); if (!transform_and_argument) { LOG_WARNING(&Poco::Logger::get("Iceberg Partition Pruning"), "Cannot parse iceberg transform name: {}.", transform_name_src); @@ -47,6 +47,13 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & return makeASTFunction( transform_and_argument->transform_name, std::make_shared(*transform_and_argument->argument), std::make_shared(column_name)); } + if (transform_and_argument->time_zone) + { + return makeASTFunction( + transform_and_argument->transform_name, + std::make_shared(column_name), + std::make_shared(*transform_and_argument->time_zone)); + } return makeASTFunction(transform_and_argument->transform_name, std::make_shared(column_name)); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h index f17a1ee97326..01a891014fb0 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h @@ -30,7 +30,7 @@ namespace DB::Iceberg struct ManifestFileEntry; class ManifestFileContent; -DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name); +DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone); /// Prune specific data files based on manifest content class ManifestFilesPruner diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 435b65086f64..f4540f68bb8e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -74,6 +74,7 @@ namespace ProfileEvents namespace DB::Setting { extern const SettingsUInt64 output_format_compression_level; + extern const SettingsTimezone iceberg_partition_timezone; } namespace DB::Iceberg @@ -112,27 +113,32 @@ void writeMessageToFile( } } -std::optional parseTransformAndArgument(const String & transform_name_src) + +std::optional parseTransformAndArgument(const String & transform_name_src, const String & time_zone) { std::string transform_name = Poco::toLower(transform_name_src); + std::optional time_zone_opt; + if (!time_zone.empty()) + time_zone_opt = time_zone; + if (transform_name == "year" || transform_name == "years") - return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt}; + return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt, time_zone_opt}; if (transform_name == "month" || transform_name == "months") - return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt}; + return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt, time_zone_opt}; if (transform_name == "day" || transform_name == "date" || transform_name == "days" || transform_name == "dates") - return TransformAndArgument{"toRelativeDayNum", std::nullopt}; + return TransformAndArgument{"toRelativeDayNum", std::nullopt, time_zone_opt}; if (transform_name == "hour" || transform_name == "hours") - return TransformAndArgument{"toRelativeHourNum", std::nullopt}; + return TransformAndArgument{"toRelativeHourNum", std::nullopt, time_zone_opt}; if (transform_name == "identity") - return TransformAndArgument{"identity", std::nullopt}; + return TransformAndArgument{"identity", std::nullopt, std::nullopt}; if (transform_name == "void") - return TransformAndArgument{"tuple", std::nullopt}; + return TransformAndArgument{"tuple", std::nullopt, std::nullopt}; if (transform_name.starts_with("truncate") || transform_name.starts_with("bucket")) { @@ -156,11 +162,11 @@ std::optional parseTransformAndArgument(const String & tra if (transform_name.starts_with("truncate")) { - return TransformAndArgument{"icebergTruncate", argument}; + return TransformAndArgument{"icebergTruncate", argument, std::nullopt}; } else if (transform_name.starts_with("bucket")) { - return TransformAndArgument{"icebergBucket", argument}; + return TransformAndArgument{"icebergBucket", argument, std::nullopt}; } } return std::nullopt; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index a1b559b05810..f2ac3a785c98 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -38,9 +38,10 @@ struct TransformAndArgument { String transform_name; std::optional argument; + std::optional time_zone; }; -std::optional parseTransformAndArgument(const String & transform_name_src); +std::optional parseTransformAndArgument(const String & transform_name_src, const String & time_zone); Poco::JSON::Object::Ptr getMetadataJSONObject( const String & metadata_file_path, From 4502cdcc83f926ac1965dc1960744971b4b5b1bf Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 27 Jan 2026 18:49:46 +0100 Subject: [PATCH 2/2] Test --- .../configs/iceberg_partition_timezone.xml | 7 + .../configs/timezone.xml | 3 + .../test_partition_timezone.py | 178 ++++++++++++++++++ 3 files changed, 188 insertions(+) create mode 100644 tests/integration/test_database_iceberg/configs/iceberg_partition_timezone.xml create mode 100644 tests/integration/test_database_iceberg/configs/timezone.xml create mode 100644 tests/integration/test_database_iceberg/test_partition_timezone.py diff --git a/tests/integration/test_database_iceberg/configs/iceberg_partition_timezone.xml b/tests/integration/test_database_iceberg/configs/iceberg_partition_timezone.xml new file mode 100644 index 000000000000..40aebd33c515 --- /dev/null +++ b/tests/integration/test_database_iceberg/configs/iceberg_partition_timezone.xml @@ -0,0 +1,7 @@ + + + + UTC + + + diff --git a/tests/integration/test_database_iceberg/configs/timezone.xml b/tests/integration/test_database_iceberg/configs/timezone.xml new file mode 100644 index 000000000000..269e52ef2247 --- /dev/null +++ b/tests/integration/test_database_iceberg/configs/timezone.xml @@ -0,0 +1,3 @@ + + Asia/Istanbul + \ No newline at end of file diff --git a/tests/integration/test_database_iceberg/test_partition_timezone.py b/tests/integration/test_database_iceberg/test_partition_timezone.py new file mode 100644 index 000000000000..55bb525cb0ca --- /dev/null +++ b/tests/integration/test_database_iceberg/test_partition_timezone.py @@ -0,0 +1,178 @@ +import glob +import json +import logging +import os +import random +import time +import uuid +from datetime import datetime, timedelta + +import pyarrow as pa +import pytest +import requests +import urllib3 +import pytz +from minio import Minio +from pyiceberg.catalog import load_catalog +from pyiceberg.partitioning import PartitionField, PartitionSpec, UNPARTITIONED_PARTITION_SPEC +from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortField, SortOrder +from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.types import ( + DoubleType, + LongType, + FloatType, + NestedField, + StringType, + StructType, + TimestampType, + TimestamptzType +) +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER + +from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm +from helpers.config_cluster import minio_secret_key, minio_access_key +from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket +from helpers.test_tools import TSV, csv_compare +from helpers.config_cluster import minio_secret_key + +BASE_URL = "http://rest:8181/v1" +BASE_URL_LOCAL = "http://localhost:8182/v1" +BASE_URL_LOCAL_RAW = "http://localhost:8182" + +CATALOG_NAME = "demo" + +DEFAULT_PARTITION_SPEC = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + ) +) +DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) +DEFAULT_SCHEMA = Schema( + NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=False), + NestedField(field_id=2, name="value", field_type=LongType(), required=False), +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node1", + main_configs=["configs/timezone.xml", "configs/cluster.xml"], + user_configs=["configs/iceberg_partition_timezone.xml"], + stay_alive=True, + with_iceberg_catalog=True, + with_zookeeper=True, + ) + + logging.info("Starting cluster...") + cluster.start() + + # TODO: properly wait for container + time.sleep(10) + + yield cluster + + finally: + cluster.shutdown() + + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", + "s3.access-key-id": minio_access_key, + "s3.secret-access-key": minio_secret_key, + }, + ) + + +def create_table( + catalog, + namespace, + table, + schema=DEFAULT_SCHEMA, + partition_spec=DEFAULT_PARTITION_SPEC, + sort_order=DEFAULT_SORT_ORDER, +): + return catalog.create_table( + identifier=f"{namespace}.{table}", + schema=schema, + location=f"s3://warehouse-rest/data", + partition_spec=partition_spec, + sort_order=sort_order, + ) + + +def create_clickhouse_iceberg_database( + node, name, additional_settings={}, engine='DataLakeCatalog' +): + settings = { + "catalog_type": "rest", + "warehouse": "demo", + "storage_endpoint": "http://minio:9000/warehouse-rest", + } + + settings.update(additional_settings) + + node.query( + f""" +DROP DATABASE IF EXISTS {name}; +SET allow_database_iceberg=true; +SET write_full_path_in_iceberg_metadata=1; +CREATE DATABASE {name} ENGINE = {engine}('{BASE_URL}', 'minio', '{minio_secret_key}') +SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} + """ + ) + show_result = node.query(f"SHOW DATABASE {name}") + assert minio_secret_key not in show_result + assert "HIDDEN" in show_result + + +def test_partition_timezone(started_cluster): + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace("timezone_ns") + table = create_table( + catalog, + "timezone_ns", + "tz_table", + ) + + # catalog accept data in UTC + data = [{"datetime": datetime(2024, 1, 1, 20, 0), "value": 1}, # partition 20240101 + {"datetime": datetime(2024, 1, 1, 23, 0), "value": 2}, # partition 20240101 + {"datetime": datetime(2024, 1, 2, 2, 0), "value": 3}] # partition 20240102 + df = pa.Table.from_pylist(data) + table.append(df) + + node = started_cluster.instances["node1"] + create_clickhouse_iceberg_database(node, CATALOG_NAME) + + # server timezone is Asia/Istanbul (UTC+3) + assert node.query(f""" + SELECT datetime, value + FROM {CATALOG_NAME}.`timezone_ns.tz_table` + ORDER BY datetime + """, timeout=10) == TSV( + [ + ["2024-01-01 23:00:00.000000", 1], + ["2024-01-02 02:00:00.000000", 2], + ["2024-01-02 05:00:00.000000", 3], + ]) + + # partitioning works correctly + assert node.query(f""" + SELECT datetime, value + FROM {CATALOG_NAME}.`timezone_ns.tz_table` + WHERE datetime >= '2024-01-02 00:00:00' + ORDER BY datetime + """, timeout=10) == TSV( + [ + ["2024-01-02 02:00:00.000000", 2], + ["2024-01-02 05:00:00.000000", 3], + ])