Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum class AccessType : uint8_t
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake)
Expand Down
1 change: 1 addition & 0 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ namespace DB
DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \
DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \
DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \
DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6895,6 +6895,9 @@ Execute request to object storage as remote on one of object_storage_cluster nod
DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"(
Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation.
)", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \
DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"(
Experimental export merge tree part.
)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \
\

/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
3 changes: 2 additions & 1 deletion src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
{
if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter()
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot())
|| is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()
|| alter->isExportPartAlter())
return false;

if (has_many_shards() || !is_replicated_table(query_ptr))
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ struct RelativePathWithMetadata
virtual ~RelativePathWithMetadata() = default;

virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); }
virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); }

virtual std::string getPath() const { return relative_path; }
virtual bool isArchive() const { return false; }
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
Expand Down
11 changes: 11 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server;
extern const ServerSettingsUInt64 max_merges_bandwidth_for_server;
extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server;
extern const ServerSettingsUInt64 max_exports_bandwidth_for_server;
extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server;
extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server;
extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server;
Expand Down Expand Up @@ -505,6 +506,8 @@ struct ContextSharedPart : boost::noncopyable
mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations
mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges

mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports

MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk.
LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
Expand Down Expand Up @@ -996,6 +999,9 @@ struct ContextSharedPart : boost::noncopyable

if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server])
merges_throttler = std::make_shared<Throttler>(bandwidth);

if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server])
exports_throttler = std::make_shared<Throttler>(bandwidth);
}
};

Expand Down Expand Up @@ -4048,6 +4054,11 @@ ThrottlerPtr Context::getMergesThrottler() const
return shared->merges_throttler;
}

ThrottlerPtr Context::getExportsThrottler() const
{
return shared->exports_throttler;
}

void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const
{
if (read_bandwidth)
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>

ThrottlerPtr getMutationsThrottler() const;
ThrottlerPtr getMergesThrottler() const;
ThrottlerPtr getExportsThrottler() const;

void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;
void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const;
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,8 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, const Stora
alter->isFreezeAlter() ||
alter->isUnlockSnapshot() ||
alter->isMovePartitionToDiskOrVolumeAlter() ||
alter->isCommentAlter())
alter->isCommentAlter() ||
alter->isExportPartAlter())
return false;
}

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table);
break;
}
case ASTAlterCommand::EXPORT_PART:
{
required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table);
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
break;
}
case ASTAlterCommand::FETCH_PARTITION:
{
required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table);
Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"MovePart", static_cast<Int8>(MOVE_PART)},
{"MergePartsStart", static_cast<Int8>(MERGE_PARTS_START)},
{"MutatePartStart", static_cast<Int8>(MUTATE_PART_START)},
{"ExportPart", static_cast<Int8>(EXPORT_PART)},
}
);

Expand Down Expand Up @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription()
"RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)."
"MutatePartStart — Mutating of a data part has started, "
"MutatePart — Mutating of a data part has finished, "
"MovePart — Moving the data part from the one disk to another one."},
"MovePart — Moving the data part from the one disk to another one."
"ExportPart — Exporting the data part from a MergeTree table into a target table that represents external storage (e.g., object storage or a data lake).."},
{"merge_reason", std::move(merge_reason_datatype),
"The reason for the event with type MERGE_PARTS. Can have one of the following values: "
"NotAMerge — The current event has the type other than MERGE_PARTS, "
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/PartLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct PartLogElement
MOVE_PART = 6,
MERGE_PARTS_START = 7,
MUTATE_PART_START = 8,
EXPORT_PART = 9,
};

/// Copy of MergeAlgorithm since values are written to disk.
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/executeDDLQueryOnCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
ASTAlterCommand::ATTACH_PARTITION,
/// Usually followed by ATTACH PARTITION
ASTAlterCommand::FETCH_PARTITION,
/// Data operation that should be executed locally on each replica
ASTAlterCommand::EXPORT_PART,
/// Logical error
ASTAlterCommand::NO_TYPE,
};
Expand Down
28 changes: 28 additions & 0 deletions src/Parsers/ASTAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
ostr << quoteString(move_destination_name);
}
}
else if (type == ASTAlterCommand::EXPORT_PART)
{
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << "PART"
<< (settings.hilite ? hilite_none : "");
partition->format(ostr, settings, state, frame);
ostr << " TO ";
switch (move_destination_type)
{
case DataDestinationType::TABLE:
ostr << "TABLE ";
if (!to_database.empty())
{
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
<< (settings.hilite ? hilite_none : "") << ".";
}
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table)
<< (settings.hilite ? hilite_none : "");
return;
default:
break;
}

}
else if (type == ASTAlterCommand::REPLACE_PARTITION)
{
ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION "
Expand Down Expand Up @@ -624,6 +647,11 @@ bool ASTAlterQuery::isMovePartitionToDiskOrVolumeAlter() const
return false;
}

bool ASTAlterQuery::isExportPartAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::EXPORT_PART);
}


/** Get the text that identifies this element. */
String ASTAlterQuery::getID(char delim) const
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTAlterQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST
FREEZE_ALL,
UNFREEZE_PARTITION,
UNFREEZE_ALL,
EXPORT_PART,

DELETE,
UPDATE,
Expand Down Expand Up @@ -263,6 +264,8 @@ class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCl

bool isMovePartitionToDiskOrVolumeAlter() const;

bool isExportPartAlter() const;

bool isCommentAlter() const;

String getID(char) const override;
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/CommonParsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ namespace DB
MR_MACROS(MONTHS, "MONTHS") \
MR_MACROS(MOVE_PART, "MOVE PART") \
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
MR_MACROS(EXPORT_PART, "EXPORT PART") \
MR_MACROS(MOVE, "MOVE") \
MR_MACROS(MS, "MS") \
MR_MACROS(MUTATION, "MUTATION") \
Expand Down
18 changes: 18 additions & 0 deletions src/Parsers/ParserAlterQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION);
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
ParserKeyword s_move_part(Keyword::MOVE_PART);
ParserKeyword s_export_part(Keyword::EXPORT_PART);
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
Expand Down Expand Up @@ -535,6 +536,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected

command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<String>();
}
else if (s_export_part.ignore(pos, expected))
{
if (!parser_string_and_substituion.parse(pos, command_partition, expected))
return false;

command->type = ASTAlterCommand::EXPORT_PART;
command->part = true;

if (!s_to_table.ignore(pos, expected))
{
return false;
}

if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
return false;
command->move_destination_type = DataDestinationType::TABLE;
}
else if (s_move_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command_partition, expected))
Expand Down
48 changes: 0 additions & 48 deletions src/Storages/IPartitionStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,6 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column;
}

std::string WildcardPartitionStrategy::getPathForRead(
const std::string & prefix)
{
return prefix;
}

std::string WildcardPartitionStrategy::getPathForWrite(
const std::string & prefix,
const std::string & partition_key)
{
return PartitionedSink::replaceWildcards(prefix, partition_key);
}

HiveStylePartitionStrategy::HiveStylePartitionStrategy(
KeyDescription partition_key_description_,
const Block & sample_block_,
Expand All @@ -288,41 +275,6 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy(
block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set);
}

std::string HiveStylePartitionStrategy::getPathForRead(const std::string & prefix)
{
return prefix + "**." + Poco::toLower(file_format);
}

std::string HiveStylePartitionStrategy::getPathForWrite(
const std::string & prefix,
const std::string & partition_key)
{
std::string path;

if (!prefix.empty())
{
path += prefix;
if (path.back() != '/')
{
path += '/';
}
}

/// Not adding '/' because buildExpressionHive() always adds a trailing '/'
path += partition_key;

/*
* File extension is toLower(format)
* This isn't ideal, but I guess multiple formats can be specified and introduced.
* So I think it is simpler to keep it this way.
*
* Or perhaps implement something like `IInputFormat::getFileExtension()`
*/
path += std::to_string(generateSnowflakeID()) + "." + Poco::toLower(file_format);

return path;
}

ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
{
Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
Expand Down
17 changes: 7 additions & 10 deletions src/Storages/IPartitionStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ struct IPartitionStrategy

virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0;

virtual std::string getPathForRead(const std::string & prefix) = 0;
virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0;
ColumnPtr computePartitionKey(Block & block) const
{
actions_with_column_name.actions->execute(block);

return block.getByName(actions_with_column_name.column_name).column;
}

virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk)
{
Expand All @@ -53,6 +57,7 @@ struct IPartitionStrategy
const KeyDescription partition_key_description;
const Block sample_block;
ContextPtr context;
PartitionExpressionActionsAndColumnName actions_with_column_name;
};

/*
Expand Down Expand Up @@ -89,11 +94,6 @@ struct WildcardPartitionStrategy : IPartitionStrategy
WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
std::string getPathForRead(const std::string & prefix) override;
std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override;

private:
PartitionExpressionActionsAndColumnName actions_with_column_name;
};

/*
Expand All @@ -111,8 +111,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy
bool partition_columns_in_data_file_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
std::string getPathForRead(const std::string & prefix) override;
std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override;

ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override;
Block getFormatHeader() override;
Expand All @@ -121,7 +119,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy
const std::string file_format;
const bool partition_columns_in_data_file;
std::unordered_set<std::string> partition_columns_name_set;
PartitionExpressionActionsAndColumnName actions_with_column_name;
Block block_without_partition_columns;
};

Expand Down
Loading
Loading