Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6929,6 +6929,15 @@ Throw an error if there are pending patch parts when exporting a merge tree part
)", 0) \
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Possible values:

- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
- `` (empty value) - use server or session timezone

Default value is empty.
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"iceberg_partition_timezone", "", "", "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
});
Expand Down
13 changes: 12 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Analyzer/FunctionNode.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/IColumn_fwd.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/ColumnsWithTypeAndName.h>
Expand Down Expand Up @@ -78,6 +79,7 @@ namespace Setting
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsUInt64 output_format_compression_zstd_window_log;
extern const SettingsBool write_full_path_in_iceberg_metadata;
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace DataLakeStorageSetting
Expand Down Expand Up @@ -966,7 +968,7 @@ ChunkPartitioner::ChunkPartitioner(

auto & factory = FunctionFactory::instance();

auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name);
auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
if (!transform_and_argument)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown transform {}", transform_name);

Expand All @@ -980,6 +982,7 @@ ChunkPartitioner::ChunkPartitioner(
result_data_types.push_back(function->getReturnType(columns_for_function));
functions.push_back(function);
function_params.push_back(transform_and_argument->argument);
function_time_zones.push_back(transform_and_argument->time_zone);
columns_to_apply.push_back(column_name);
}
}
Expand Down Expand Up @@ -1016,6 +1019,14 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "#"));
}
arguments.push_back(name_to_column[columns_to_apply[transform_ind]]);
if (function_time_zones[transform_ind].has_value())
{
auto type = std::make_shared<DataTypeString>();
auto column_value = ColumnString::create();
column_value->insert(*function_time_zones[transform_ind]);
auto const_column = ColumnConst::create(std::move(column_value), chunk.getNumRows());
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone"));
}
auto result
= functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared<DataTypeString>(), chunk.getNumRows(), false);
for (size_t i = 0; i < chunk.getNumRows(); ++i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class ChunkPartitioner

std::vector<FunctionOverloadResolverPtr> functions;
std::vector<std::optional<size_t>> function_params;
std::vector<std::optional<String>> function_time_zones;
std::vector<String> columns_to_apply;
std::vector<DataTypePtr> result_data_types;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
#include <compare>
#include <optional>

#include <Interpreters/Context.h>
#include <Interpreters/IcebergMetadataLog.h>

#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
#include <Storages/ObjectStorage/Utils.h>

#include <Core/Settings.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Poco/JSON/Parser.h>
Expand All @@ -32,6 +34,11 @@ namespace DB::ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace DB::Setting
{
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace DB::Iceberg
{

Expand Down Expand Up @@ -217,7 +224,7 @@ ManifestFileContent::ManifestFileContent(
auto transform_name = partition_specification_field->getValue<String>(f_partition_transform);
auto partition_name = partition_specification_field->getValue<String>(f_partition_name);
common_partition_specification.emplace_back(source_id, transform_name, partition_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name);
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
/// Unsupported partition key expression
if (partition_ast == nullptr)
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ using namespace DB;
namespace DB::Iceberg
{

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name)
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone)
{
auto transform_and_argument = parseTransformAndArgument(transform_name_src);
auto transform_and_argument = parseTransformAndArgument(transform_name_src, time_zone);
if (!transform_and_argument)
{
LOG_WARNING(&Poco::Logger::get("Iceberg Partition Pruning"), "Cannot parse iceberg transform name: {}.", transform_name_src);
Expand All @@ -47,6 +47,13 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String &
return makeASTFunction(
transform_and_argument->transform_name, std::make_shared<DB::ASTLiteral>(*transform_and_argument->argument), std::make_shared<DB::ASTIdentifier>(column_name));
}
if (transform_and_argument->time_zone)
{
return makeASTFunction(
transform_and_argument->transform_name,
std::make_shared<DB::ASTIdentifier>(column_name),
std::make_shared<DB::ASTLiteral>(*transform_and_argument->time_zone));
}
return makeASTFunction(transform_and_argument->transform_name, std::make_shared<DB::ASTIdentifier>(column_name));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace DB::Iceberg
struct ManifestFileEntry;
class ManifestFileContent;

DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name);
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone);

/// Prune specific data files based on manifest content
class ManifestFilesPruner
Expand Down
24 changes: 15 additions & 9 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ namespace ProfileEvents
namespace DB::Setting
{
extern const SettingsUInt64 output_format_compression_level;
extern const SettingsTimezone iceberg_partition_timezone;
}

namespace DB::Iceberg
Expand Down Expand Up @@ -112,27 +113,32 @@ void writeMessageToFile(
}
}

std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src)

std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone)
{
std::string transform_name = Poco::toLower(transform_name_src);

std::optional<String> time_zone_opt;
if (!time_zone.empty())
time_zone_opt = time_zone;

if (transform_name == "year" || transform_name == "years")
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "month" || transform_name == "months")
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt};
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt, time_zone_opt};

if (transform_name == "day" || transform_name == "date" || transform_name == "days" || transform_name == "dates")
return TransformAndArgument{"toRelativeDayNum", std::nullopt};
return TransformAndArgument{"toRelativeDayNum", std::nullopt, time_zone_opt};

if (transform_name == "hour" || transform_name == "hours")
return TransformAndArgument{"toRelativeHourNum", std::nullopt};
return TransformAndArgument{"toRelativeHourNum", std::nullopt, time_zone_opt};

if (transform_name == "identity")
return TransformAndArgument{"identity", std::nullopt};
return TransformAndArgument{"identity", std::nullopt, std::nullopt};

if (transform_name == "void")
return TransformAndArgument{"tuple", std::nullopt};
return TransformAndArgument{"tuple", std::nullopt, std::nullopt};

if (transform_name.starts_with("truncate") || transform_name.starts_with("bucket"))
{
Expand All @@ -156,11 +162,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra

if (transform_name.starts_with("truncate"))
{
return TransformAndArgument{"icebergTruncate", argument};
return TransformAndArgument{"icebergTruncate", argument, std::nullopt};
}
else if (transform_name.starts_with("bucket"))
{
return TransformAndArgument{"icebergBucket", argument};
return TransformAndArgument{"icebergBucket", argument, std::nullopt};
}
}
return std::nullopt;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ struct TransformAndArgument
{
String transform_name;
std::optional<size_t> argument;
std::optional<String> time_zone;
};

std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src);
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone);

Poco::JSON::Object::Ptr getMetadataJSONObject(
const String & metadata_file_path,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<iceberg_partition_timezone>UTC</iceberg_partition_timezone>
</default>
</profiles>
</clickhouse>
3 changes: 3 additions & 0 deletions tests/integration/test_database_iceberg/configs/timezone.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<clickhouse>
<timezone>Asia/Istanbul</timezone>
</clickhouse>
Loading
Loading