From 71a3a901a31397158d7b1e8afe8052f5d800b381 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Hudobski?= Date: Mon, 27 Apr 2026 17:11:29 +0200 Subject: [PATCH] validator: implement test for CQL per-row TTL Verify that rows inserted with USING TTL are filtered out from ANN query results after they expire. The test inserts rows with and without TTL, creates an index, waits for expiry, and asserts that only non-TTL rows are returned. Fixes: VECTOR-598 Co-authored-by: Copilot --- crates/validator/src/lib.rs | 2 + crates/validator/src/ttl.rs | 182 ++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 crates/validator/src/ttl.rs diff --git a/crates/validator/src/lib.rs b/crates/validator/src/lib.rs index e2c1f3a8..5bd0c95e 100644 --- a/crates/validator/src/lib.rs +++ b/crates/validator/src/lib.rs @@ -19,6 +19,7 @@ mod quantization_and_rescoring; mod reconnect; mod serde; mod similarity_functions; +mod ttl; use async_backtrace::framed; use e2etest::TestCase; @@ -239,6 +240,7 @@ pub async fn test_cases() -> impl Iterator) "quantization_and_rescoring", quantization_and_rescoring::new().await, ), + ("ttl", ttl::new().await), ] .into_iter() .map(|(name, test_case)| (name.to_string(), test_case)) diff --git a/crates/validator/src/ttl.rs b/crates/validator/src/ttl.rs new file mode 100644 index 00000000..d876de55 --- /dev/null +++ b/crates/validator/src/ttl.rs @@ -0,0 +1,182 @@ +/* + * Copyright 2026-present ScyllaDB + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +use crate::TestActors; +use crate::common::*; +use async_backtrace::framed; +use e2etest::TestCase; +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use tracing::info; + +/// How long (in seconds) before we expect the expiration service to have +/// processed expired rows. With `alternator_ttl_period_in_seconds=0.5` +/// a full scan happens every 0.5 s, but allow some extra margin for CDC +/// propagation. +const EXPIRATION_TIMEOUT: Duration = Duration::from_secs(30); + +fn now_epoch_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time before UNIX epoch") + .as_secs() as i64 +} + +#[framed] +pub(crate) async fn new() -> TestCase { + let timeout = DEFAULT_TEST_TIMEOUT; + TestCase::empty() + .with_init(timeout, init_ttl) + .with_cleanup(timeout, cleanup) + .with_test( + "cql_per_row_ttl_expires_from_index", + timeout, + cql_per_row_ttl_expires_from_index, + ) +} + +/// Custom init that starts ScyllaDB with a low TTL scan period so that the +/// expiration service picks up expired rows quickly. +#[framed] +async fn init_ttl(actors: TestActors) { + info!("started"); + + let mut scylla_configs = get_default_scylla_node_configs(&actors).await; + for cfg in &mut scylla_configs { + cfg.args + .push("--alternator-ttl-period-in-seconds=0.5".to_string()); + } + let vs_configs = get_default_vs_node_configs(&actors).await; + init_with_config(actors, scylla_configs, vs_configs).await; + + info!("finished"); +} + +/// Test that rows inserted with CQL per-row TTL are not returned by ANN +/// queries after they expire, and that the index count decrements accordingly. +/// +/// 1. Create a table with a TTL column and insert rows — some already expired, +/// some that will never expire. +/// 2. Create an index and verify all rows are queryable via ANN. +/// 3. Wait for the expiration service to delete expired rows (via CDC). +/// 4. Verify the index count drops and ANN queries return only non-TTL rows. +#[framed] +async fn cql_per_row_ttl_expires_from_index(actors: TestActors) { + info!("started"); + + let (session, clients) = prepare_connection(&actors).await; + + let keyspace = create_keyspace(&session).await; + let table = create_table( + &session, + "pk INT PRIMARY KEY, v VECTOR, expiration BIGINT TTL", + None, + ) + .await; + + // Expire 5 seconds from now — enough time to build the index and + // observe all 5 rows before the expiration service deletes them. + let expire_at = now_epoch_secs() + 5; + + info!("Insert 3 rows with near-future expiration and 2 rows without expiration"); + for pk in 0..3 { + session + .query_unpaged( + format!( + "INSERT INTO {table} (pk, v, expiration) VALUES ({pk}, [{v}, 0.0, 0.0], {expire_at})", + v = pk as f32, + ), + (), + ) + .await + .expect("failed to insert data with TTL"); + } + for pk in 10..12 { + session + .query_unpaged( + format!( + "INSERT INTO {table} (pk, v) VALUES ({pk}, [{v}, 1.0, 1.0])", + v = pk as f32, + ), + (), + ) + .await + .expect("failed to insert data without TTL"); + } + + let index = create_index(CreateIndexQuery::new(&session, &clients, &table, "v")).await; + + for client in &clients { + let index_status = wait_for_index(client, &index).await; + assert_eq!( + index_status.count, 5, + "Expected 5 vectors to be indexed before TTL expiry" + ); + } + + info!("Verify all 5 rows are returned before expiration"); + let result = wait_for_value( + || async { + let result = get_opt_query_results( + format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"), + &session, + ) + .await; + result.filter(|r| r.rows_num() == 5) + }, + "Waiting for ANN query to return all 5 rows", + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + assert_eq!(result.rows_num(), 5, "Expected 5 rows before expiration"); + + info!("Wait for index count to drop after expiration service runs"); + for client in &clients { + wait_for( + || async { + let status = client.index_status(&index.keyspace, &index.index).await; + matches!(status, Ok(s) if s.count == 2) + }, + "Waiting for expired rows to be removed from index", + EXPIRATION_TIMEOUT, + ) + .await; + } + + info!("Verify ANN query returns only the non-TTL rows after expiration"); + let result = wait_for_value( + || async { + let result = get_opt_query_results( + format!("SELECT pk FROM {table} ORDER BY v ANN OF [10.0, 1.0, 1.0] LIMIT 10"), + &session, + ) + .await; + result.filter(|r| r.rows_num() == 2) + }, + "Waiting for ANN query to return only non-TTL rows", + DEFAULT_OPERATION_TIMEOUT, + ) + .await; + let rows: Vec = result + .rows::<(i32,)>() + .expect("failed to get rows") + .map(|row| row.expect("failed to get row").0) + .collect(); + assert_eq!(rows.len(), 2, "Expected 2 rows after expiration"); + for pk in &rows { + assert!( + *pk >= 10 && *pk < 12, + "Expected only non-TTL rows (pk=10,11), got pk={pk}" + ); + } + + session + .query_unpaged(format!("DROP KEYSPACE {keyspace}"), ()) + .await + .expect("failed to drop a keyspace"); + + info!("finished"); +}