Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string
if (!json)
return;

successfully_parsed = true;
is_valid = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
Expand All @@ -129,7 +129,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string
}
}

std::string PathWithMetadata::CommandInTaskResponse::to_string() const
std::string PathWithMetadata::CommandInTaskResponse::toString() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
Expand Down
16 changes: 10 additions & 6 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,19 @@ struct PathWithMetadata
CommandInTaskResponse() = default;
explicit CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
bool isValid() const { return is_valid; }
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us)
{
retry_after_us = time_us;
is_valid = true;
}

std::string to_string() const;
std::string toString() const;

std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }

private:
bool successfully_parsed = false;
bool is_valid = false;
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
};

Expand Down Expand Up @@ -158,7 +162,7 @@ struct PathWithMetadata
, absolute_path((absolute_path_.has_value() && !absolute_path_.value().empty()) ? absolute_path_ : std::nullopt)
, object_storage_to_use(object_storage_to_use_)
{
if (command.is_parsed())
if (command.isValid())
relative_path = "";
}

Expand Down
11 changes: 8 additions & 3 deletions src/Interpreters/ClusterFunctionReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o

file_meta_info = object->file_meta_info;

const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
absolute_path = object->getAbsolutePath();
if (object->getCommand().isValid())
path = object->getCommand().toString();
else
{
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
absolute_path = object->getAbsolutePath();
}
}

ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_)
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (!object_info)
return {};

if (object_info->getCommand().is_parsed())
if (object_info->getCommand().isValid())
{
auto retry_after_us = object_info->getCommand().get_retry_after_us();
auto retry_after_us = object_info->getCommand().getRetryAfterUs();
if (retry_after_us.has_value())
{
not_a_path = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
, iterator_exhausted(false)
{
Poco::Timestamp now;
size_t nodes = ids_of_nodes.size();
for (size_t i = 0; i < nodes; ++i)
{
replica_to_files_to_be_processed[i] = std::list<ObjectInfoPtr>{};
last_node_activity[i] = now;
}
}

ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
Expand Down Expand Up @@ -213,20 +217,23 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s

while (it != unprocessed_files.end())
{
auto last_activity = last_node_activity.find(it->second.second);
if (lock_object_storage_task_distribution_us <= 0
|| last_activity == last_node_activity.end()
|| activity_limit > last_activity->second)
auto number_of_matched_replica = it->second.second;
auto last_activity = last_node_activity.find(number_of_matched_replica);
if (lock_object_storage_task_distribution_us <= 0 // file deferring is turned off
|| it->second.second == number_of_current_replica // file is matching with current replica
|| last_activity == last_node_activity.end() // msut never be happen, last_activity is filled for each replica on start
|| activity_limit > last_activity->second) // matched replica did not ask for a new files for a while
{
auto next_file = it->second.first;
unprocessed_files.erase(it);

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
"Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}",
file_path,
number_of_current_replica
number_of_current_replica,
number_of_matched_replica
);

return next_file;
Expand All @@ -246,8 +253,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
/// All unprocessed files owned by alive replicas with recenlty activity
/// Need to retry after (oldest_activity - activity_limit) microseconds
PathWithMetadata::CommandInTaskResponse response;
response.set_retry_after_us(oldest_activity - activity_limit);
return std::make_shared<ObjectInfo>(response.to_string());
response.setRetryAfterUs(oldest_activity - activity_limit);
return std::make_shared<ObjectInfo>(response.toString());
}

return {};
Expand Down
99 changes: 83 additions & 16 deletions tests/integration/test_s3_cache_locality/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def create_buckets_s3(cluster, files=1000):
s3_data = []

for file_number in range(files):
file_name = f"data/generated/file_{file_number}.csv"
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
file_name = f"data/generated_{files}/file_{file_number}.csv"
os.makedirs(os.path.join(SCRIPT_DIR, f"data/generated_{files}/"), exist_ok=True)
s3_data.append(file_name)
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
# a String, b UInt64
Expand Down Expand Up @@ -69,15 +69,17 @@ def started_cluster():
logging.info("Cluster started")

create_buckets_s3(cluster)
create_buckets_s3(cluster, files=3)

yield cluster
finally:
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True)
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_1000/"), ignore_errors=True)
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_3/"), ignore_errors=True)
cluster.shutdown()


def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
lock_object_storage_task_distribution_ms):
lock_object_storage_task_distribution_ms, files=1000):
for host in list(cluster.instances.values()):
host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True)

Expand All @@ -92,7 +94,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
result_first = node.query(
f"""
SELECT count(*)
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
""",
Expand All @@ -103,7 +105,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
result_second = node.query(
f"""
SELECT count(*)
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
""",
Expand Down Expand Up @@ -134,6 +136,40 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
return int(s3_get_first), int(s3_get_second)


def check_s3_gets_by_hosts(cluster, node, expected_result,
lock_object_storage_task_distribution_ms, files=1000):
settings = {
"enable_filesystem_cache": False,
}

settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
query_id = str(uuid.uuid4())
result = node.query(
f"""
SELECT count(*)
FROM s3Cluster('{cluster}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
""",
query_id=query_id,
)
assert result == expected_result

node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster}")

s3_get = node.query(
f"""
SELECT ProfileEvents['S3GetObject']
FROM clusterAllReplicas('{cluster}', system.query_log)
WHERE type='QueryFinish'
AND initial_query_id='{query_id}'
ORDER BY hostname
""",
)

return [int(events) for events in s3_get.strip().split("\n")]


def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
lock_object_storage_task_distribution_ms):
# Repeat test several times to get average result
Expand All @@ -154,7 +190,7 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m
expected_result = node.query(
f"""
SELECT count(*)
FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
FROM s3('http://minio1:9001/root/data/generated_1000/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
"""
)
Expand All @@ -170,26 +206,57 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * dispersion

# Different nodes order
# Different replicas order
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * dispersion

# No last node
# No last replica
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000
assert s3_get_second <= s3_get_first * (0.179 + dispersion) # actual value - 179 of 1000 files changed replica

# No first node
# No first replica
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000
assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 189 of 1000 files changed replica

# No first node, different nodes order
# No first replica, different replicas order
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * (0.189 + dispersion)

# Add new node, different nodes order
# Add new replica, different replicas order
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * (0.189 + dispersion)

# New node and old node, different nodes order
# New replica and old replica, different replicas order
# All files from removed replica changed replica
# Some files from existed replicas changed replica on the new replica
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000
assert s3_get_second <= s3_get_first * (0.368 + dispersion) # actual value - 368 of 1000 changed replica

if (lock_object_storage_task_distribution_ms > 0):
s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
assert s3_get == [189,210,220,202,179]
s3_get = check_s3_gets_by_hosts('cluster_1234', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
assert s3_get == [247,243,264,246]
s3_get = check_s3_gets_by_hosts('cluster_2345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
assert s3_get == [251,280,248,221]


def test_cache_locality_few_files(started_cluster):
node = started_cluster.instances["clickhouse0"]

expected_result = node.query(
f"""
SELECT count(*)
FROM s3('http://minio1:9001/root/data/generated_3/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
"""
)

# Rendezvous hash makes the next distribution:
# file_0 - clickhouse1
# file_1 - clickhouse4
# file_2 - clickhouse3
# The same distribution must be in each query
for _ in range(10):
s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms=30000, files=3)
assert s3_get == [1,0,1,1,0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<lock_object_storage_task_distribution_ms>0</lock_object_storage_task_distribution_ms>
</default>
</profiles>
</clickhouse>
2 changes: 2 additions & 0 deletions tests/integration/test_storage_s3/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def started_cluster():
"configs/access.xml",
"configs/users.xml",
"configs/s3_retry.xml",
"configs/lock_object_storage_task_distribution_ms.xml",
],
)
cluster.add_instance(
Expand Down Expand Up @@ -122,6 +123,7 @@ def started_cluster():
"configs/users.xml",
"configs/s3_retry.xml",
"configs/process_archives_as_whole_with_cluster.xml",
"configs/lock_object_storage_task_distribution_ms.xml",
],
)
cluster.add_instance(
Expand Down
Loading