diff --git a/crates/validator/src/lib.rs b/crates/validator/src/lib.rs index e2c1f3a8..5bd0c95e 100644 --- a/crates/validator/src/lib.rs +++ b/crates/validator/src/lib.rs @@ -19,6 +19,7 @@ mod quantization_and_rescoring; mod reconnect; mod serde; mod similarity_functions; +mod ttl; use async_backtrace::framed; use e2etest::TestCase; @@ -239,6 +240,7 @@ pub async fn test_cases() -> impl Iterator) "quantization_and_rescoring", quantization_and_rescoring::new().await, ), + ("ttl", ttl::new().await), ] .into_iter() .map(|(name, test_case)| (name.to_string(), test_case)) diff --git a/crates/validator/src/ttl.rs b/crates/validator/src/ttl.rs new file mode 100644 index 00000000..d876de55 --- /dev/null +++ b/crates/validator/src/ttl.rs @@ -0,0 +1,182 @@ +/* + * Copyright 2026-present ScyllaDB + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +use crate::TestActors; +use crate::common::*; +use async_backtrace::framed; +use e2etest::TestCase; +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use tracing::info; + +/// How long (in seconds) before we expect the expiration service to have +/// processed expired rows. With `alternator_ttl_period_in_seconds=0.5` +/// a full scan happens every 0.5 s, but allow some extra margin for CDC +/// propagation. +const EXPIRATION_TIMEOUT: Duration = Duration::from_secs(30); + +fn now_epoch_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time before UNIX epoch") + .as_secs() as i64 +} + +#[framed] +pub(crate) async fn new() -> TestCase { + let timeout = DEFAULT_TEST_TIMEOUT; + TestCase::empty() + .with_init(timeout, init_ttl) + .with_cleanup(timeout, cleanup) + .with_test( + "cql_per_row_ttl_expires_from_index", + timeout, + cql_per_row_ttl_expires_from_index, + ) +} + +/// Custom init that starts ScyllaDB with a low TTL scan period so that the +/// expiration service picks up expired rows quickly. +#[framed] +async fn init_ttl(actors: TestActors) { + info!("started"); + + let mut scylla_configs = get_default_scylla_node_configs(&actors).await; + for cfg in &mut scylla_configs { + cfg.args + .push("--alternator-ttl-period-in-seconds=0.5".to_string()); + } + let vs_configs = get_default_vs_node_configs(&actors).await; + init_with_config(actors, scylla_configs, vs_configs).await; + + info!("finished"); +} + +/// Test that rows inserted with CQL per-row TTL are not returned by ANN +/// queries after they expire, and that the index count decrements accordingly. +/// +/// 1. Create a table with a TTL column and insert rows — some already expired, +/// some that will never expire. +/// 2. Create an index and verify all rows are queryable via ANN. +/// 3. Wait for the expiration service to delete expired rows (via CDC). +/// 4. Verify the index count drops and ANN queries return only non-TTL rows. +#[framed] +async fn cql_per_row_ttl_expires_from_index(actors: TestActors) { + info!("started"); + + let (session, clients) = prepare_connection(&actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "pk INT PRIMARY KEY, v VECTOR, expiration BIGINT TTL", + None, + ) + .await; + + // Expire 5 seconds from now — enough time to build the index and + // observe all 5 rows before the expiration service deletes them. + let expire_at = now_epoch_secs() + 5; + + info!("Insert 3 rows with near-future expiration and 2 rows without expiration"); + for pk in 0..3 { + session + .query_unpaged( + format!( + "INSERT INTO {table} (pk, v, expiration) VALUES ({pk}, [{v}, 0.0, 0.0], {expire_at})", + v = pk as f32, + ), + (), + ) + .await + .expect("failed to insert data with TTL"); + } + for pk in 10..12 { + session + .query_unpaged( + format!( + "INSERT INTO {table} (pk, v) VALUES ({pk}, [{v}, 1.0, 1.0])", + v = pk as f32, + ), + (), + ) + .await + .expect("failed to insert data without TTL"); + } + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!( + index_status.count, 5, + "Expected 5 vectors to be indexed before TTL expiry" + ); + } + + info!("Verify all 5 rows are returned before expiration"); + let result = wait_for_value( + || async { + let result = get_opt_query_results( + format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"), + &session, + ) + .await; + result.filter(|r| r.rows_num() == 5) + }, + "Waiting for ANN query to return all 5 rows", + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + assert_eq!(result.rows_num(), 5, "Expected 5 rows before expiration"); + + info!("Wait for index count to drop after expiration service runs"); + for client in &clients { + wait_for( + || async { + let status = client.index_status(&index.keyspace, &index.index).await; + matches!(status, Ok(s) if s.count == 2) + }, + "Waiting for expired rows to be removed from index", + EXPIRATION_TIMEOUT, + ) + .await; + } + + info!("Verify ANN query returns only the non-TTL rows after expiration"); + let result = wait_for_value( + || async { + let result = get_opt_query_results( + format!("SELECT pk FROM {table} ORDER BY v ANN OF [10.0, 1.0, 1.0] LIMIT 10"), + &session, + ) + .await; + result.filter(|r| r.rows_num() == 2) + }, + "Waiting for ANN query to return only non-TTL rows", + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + let rows: Vec = result + .rows::<(i32,)>() + .expect("failed to get rows") + .map(|row| row.expect("failed to get row").0) + .collect(); + assert_eq!(rows.len(), 2, "Expected 2 rows after expiration"); + for pk in &rows { + assert!( + *pk >= 10 && *pk < 12, + "Expected only non-TTL rows (pk=10,11), got pk={pk}" + ); + } + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +}