Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod quantization_and_rescoring;
mod reconnect;
mod serde;
mod similarity_functions;
mod ttl;

use async_backtrace::framed;
use e2etest::TestCase;
Expand Down Expand Up @@ -239,6 +240,7 @@ pub async fn test_cases() -> impl Iterator<Item = (String, TestCase<TestActors>)
"quantization_and_rescoring",
quantization_and_rescoring::new().await,
),
("ttl", ttl::new().await),
]
.into_iter()
.map(|(name, test_case)| (name.to_string(), test_case))
Expand Down
182 changes: 182 additions & 0 deletions crates/validator/src/ttl.rs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this is a test for CDC? If so, should it go into cdc.rs?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to specify a launch option for scylla, so I'd prefer a different file

Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2026-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/

use crate::TestActors;
use crate::common::*;
use async_backtrace::framed;
use e2etest::TestCase;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tracing::info;

/// How long (in seconds) before we expect the expiration service to have
/// processed expired rows. With `alternator_ttl_period_in_seconds=0.5`
/// a full scan happens every 0.5 s, but allow some extra margin for CDC
/// propagation.
const EXPIRATION_TIMEOUT: Duration = Duration::from_secs(30);

fn now_epoch_secs() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before UNIX epoch")
.as_secs() as i64
}

#[framed]
pub(crate) async fn new() -> TestCase<TestActors> {
let timeout = DEFAULT_TEST_TIMEOUT;
TestCase::empty()
.with_init(timeout, init_ttl)
.with_cleanup(timeout, cleanup)
.with_test(
"cql_per_row_ttl_expires_from_index",
timeout,
cql_per_row_ttl_expires_from_index,
)
}

/// Custom init that starts ScyllaDB with a low TTL scan period so that the
/// expiration service picks up expired rows quickly.
#[framed]
async fn init_ttl(actors: TestActors) {
info!("started");

let mut scylla_configs = get_default_scylla_node_configs(&actors).await;
for cfg in &mut scylla_configs {
cfg.args
.push("--alternator-ttl-period-in-seconds=0.5".to_string());
}
let vs_configs = get_default_vs_node_configs(&actors).await;
init_with_config(actors, scylla_configs, vs_configs).await;

info!("finished");
}

/// Test that rows inserted with CQL per-row TTL are not returned by ANN
/// queries after they expire, and that the index count decrements accordingly.
///
/// 1. Create a table with a TTL column and insert rows — some already expired,
/// some that will never expire.
/// 2. Create an index and verify all rows are queryable via ANN.
/// 3. Wait for the expiration service to delete expired rows (via CDC).
/// 4. Verify the index count drops and ANN queries return only non-TTL rows.
#[framed]
async fn cql_per_row_ttl_expires_from_index(actors: TestActors) {
info!("started");

let (session, clients) = prepare_connection(&actors).await;

let keyspace = create_keyspace(&session).await;
let table = create_table(
&session,
"pk INT PRIMARY KEY, v VECTOR<FLOAT, 3>, expiration BIGINT TTL",
None,
)
.await;

// Expire 5 seconds from now — enough time to build the index and
// observe all 5 rows before the expiration service deletes them.
let expire_at = now_epoch_secs() + 5;

info!("Insert 3 rows with near-future expiration and 2 rows without expiration");
for pk in 0..3 {
session
.query_unpaged(
format!(
"INSERT INTO {table} (pk, v, expiration) VALUES ({pk}, [{v}, 0.0, 0.0], {expire_at})",
v = pk as f32,
),
(),
)
.await
.expect("failed to insert data with TTL");
}
for pk in 10..12 {
session
.query_unpaged(
format!(
"INSERT INTO {table} (pk, v) VALUES ({pk}, [{v}, 1.0, 1.0])",
v = pk as f32,
),
(),
)
.await
.expect("failed to insert data without TTL");
}

let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await;

for client in &clients {
let index_status = wait_for_index(client, &index).await;
assert_eq!(
index_status.count, 5,
"Expected 5 vectors to be indexed before TTL expiry"
);
}

info!("Verify all 5 rows are returned before expiration");
let result = wait_for_value(
|| async {
let result = get_opt_query_results(
format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"),
&session,
)
.await;
result.filter(|r| r.rows_num() == 5)
},
"Waiting for ANN query to return all 5 rows",
DEFAULT_OPERATION_TIMEOUT,
)
.await;
assert_eq!(result.rows_num(), 5, "Expected 5 rows before expiration");

info!("Wait for index count to drop after expiration service runs");
for client in &clients {
wait_for(
|| async {
let status = client.index_status(&index.keyspace, &index.index).await;
matches!(status, Ok(s) if s.count == 2)
},
"Waiting for expired rows to be removed from index",
EXPIRATION_TIMEOUT,
)
.await;
}

info!("Verify ANN query returns only the non-TTL rows after expiration");
let result = wait_for_value(
|| async {
let result = get_opt_query_results(
format!("SELECT pk FROM {table} ORDER BY v ANN OF [10.0, 1.0, 1.0] LIMIT 10"),
&session,
)
.await;
result.filter(|r| r.rows_num() == 2)
},
"Waiting for ANN query to return only non-TTL rows",
DEFAULT_OPERATION_TIMEOUT,
)
.await;
let rows: Vec<i32> = result
.rows::<(i32,)>()
.expect("failed to get rows")
.map(|row| row.expect("failed to get row").0)
.collect();
assert_eq!(rows.len(), 2, "Expected 2 rows after expiration");
for pk in &rows {
assert!(
*pk >= 10 && *pk < 12,
"Expected only non-TTL rows (pk=10,11), got pk={pk}"
);
}

session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");

info!("finished");
}
Loading