diff --git a/docs/en/engines/table-engines/integrations/s3queue.md b/docs/en/engines/table-engines/integrations/s3queue.md index 7f6fb0271cca..614d42ca522f 100644 --- a/docs/en/engines/table-engines/integrations/s3queue.md +++ b/docs/en/engines/table-engines/integrations/s3queue.md @@ -260,6 +260,8 @@ Example: - `_path` — Path to the file. - `_file` — Name of the file. +- `_size` — Size of the file. +- `_time` — Time of the file creation. For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns). diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index fd9272b3d576..8f888797cbaf 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -958,11 +958,14 @@ Chunk ObjectStorageQueueSource::generateImpl() ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadRows, chunk.getNumRows()); ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadBytes, chunk.bytes()); + const auto & object_metadata = reader.getObjectInfo()->metadata; + VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk( chunk, read_from_format_info.requested_virtual_columns, { .path = path, - .size = reader.getObjectInfo()->metadata->size_bytes + .size = object_metadata->size_bytes, + .last_modified = object_metadata->last_modified }, getContext()); return chunk; diff --git a/tests/integration/helpers/s3_queue_common.py b/tests/integration/helpers/s3_queue_common.py index 6e935c82eb8a..8d2097dff83c 100644 --- a/tests/integration/helpers/s3_queue_common.py +++ b/tests/integration/helpers/s3_queue_common.py @@ -283,6 +283,7 @@ def create_mv( mv_name=None, create_dst_table_first=True, format="column1 UInt32, column2 UInt32, column3 UInt32", + virtual_columns="_path String", ): if mv_name is None: mv_name = f"{src_table_name}_mv" @@ -292,21 +293,29 @@ def create_mv( DROP TABLE IF EXISTS {mv_name}; """) + virtual_format = "" + virtual_names = "" + virtual_columns_list = virtual_columns.split(",") + for column in virtual_columns_list: + virtual_format += f", {column}" + name, _ = column.strip().rsplit(" ", 1) + virtual_names += f", {name}" + if create_dst_table_first: node.query( f""" - CREATE TABLE {dst_table_name} ({format}, _path String) + CREATE TABLE {dst_table_name} ({format} {virtual_format}) ENGINE = MergeTree() ORDER BY column1; - CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name}; + CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name}; """ ) else: node.query( f""" SET allow_materialized_view_with_bad_select=1; - CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name}; - CREATE TABLE {dst_table_name} ({format}, _path String) + CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name}; + CREATE TABLE {dst_table_name} ({format} {virtual_format}) ENGINE = MergeTree() ORDER BY column1; """ diff --git a/tests/integration/test_storage_s3_queue/test_0.py b/tests/integration/test_storage_s3_queue/test_0.py index 69849df57962..04ea8471f4cc 100644 --- a/tests/integration/test_storage_s3_queue/test_0.py +++ b/tests/integration/test_storage_s3_queue/test_0.py @@ -5,6 +5,7 @@ import string import time import uuid +from datetime import datetime from multiprocessing.dummy import Pool import pytest @@ -595,3 +596,46 @@ def test_multiple_tables_meta_mismatch(started_cluster): "keeper_path": keeper_path, }, ) + + +def test_virtual_columns(started_cluster): + start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + node = started_cluster.instances["instance"] + table_name = f"test_s3queue_virtual_columns_{generate_random_string()}" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}" + dst_table_name = f"{table_name}_dst" + files_path = f"{table_name}_data" + + total_values = generate_random_files(started_cluster, files_path, 1) + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={"keeper_path": keeper_path}, + ) + create_mv(node, table_name, dst_table_name, virtual_columns="_path String, _file String, _size UInt64, _time DateTime") + expected_values = set([tuple(i) for i in total_values]) + for i in range(20): + selected_values = { + tuple(map(int, l.split())) + for l in node.query( + f"SELECT column1, column2, column3 FROM {dst_table_name}" + ).splitlines() + } + if selected_values == expected_values: + break + time.sleep(1) + assert selected_values == expected_values + virtual_values = node.query( + f"SELECT count(), _path, _file, _size, _time FROM {dst_table_name} GROUP BY _path, _file, _size, _time" + ).splitlines() + assert len(virtual_values) > 0 + (_, res_path, res_file, res_size, res_time) = virtual_values[0].split("\t") + finish_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + assert f"{files_path}/{res_file}" == res_path + assert int(res_size) > 0 + assert start_time <= res_time + assert res_time <= finish_time