Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 61 additions & 26 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2633,45 +2633,80 @@ impl MetastoreService for PostgresqlMetastore {
);

// Only delete splits that are marked for deletion
// Match the non-metrics delete_splits pattern: distinguish
// "not found" (warn + succeed) from "not deletable" (FailedPrecondition).
const DELETE_SPLITS_QUERY: &str = r#"
DELETE FROM metrics_splits
WHERE
index_uid = $1
AND split_id = ANY($2)
AND split_state = 'MarkedForDeletion'
RETURNING split_id
WITH input_splits AS (
SELECT input_splits.split_id, metrics_splits.split_state
FROM UNNEST($2::text[]) AS input_splits(split_id)
LEFT JOIN metrics_splits
ON metrics_splits.index_uid = $1
AND metrics_splits.split_id = input_splits.split_id
),
Comment on lines +2642 to +2645
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Lock metrics rows before deriving not_deletable state

delete_metrics_splits computes input_splits from metrics_splits without FOR UPDATE, unlike the non-metrics delete_splits flow it is mirroring. Under concurrent mark_metrics_splits_for_deletion updates, this statement can evaluate an older split_state snapshot and return FailedPrecondition for splits that were already transitioned to MarkedForDeletion by a racing transaction, causing spurious delete failures and retry churn. Adding row locking in the state-read CTE (as done in delete_splits) avoids this stale-state race.

Useful? React with 👍 / 👎.

deleted AS (
DELETE FROM metrics_splits
USING input_splits
WHERE
metrics_splits.index_uid = $1
AND metrics_splits.split_id = input_splits.split_id
AND NOT EXISTS (
SELECT 1 FROM input_splits
WHERE split_state IN ('Staged', 'Published')
)
RETURNING metrics_splits.split_id
)
SELECT
(SELECT COUNT(*) FROM input_splits WHERE split_state IS NOT NULL) as num_found,
(SELECT COUNT(*) FROM deleted) as num_deleted,
COALESCE(
(SELECT ARRAY_AGG(split_id) FROM input_splits
WHERE split_state IN ('Staged', 'Published')),
ARRAY[]::text[]
) as not_deletable,
COALESCE(
(SELECT ARRAY_AGG(split_id) FROM input_splits
WHERE split_state IS NULL),
ARRAY[]::text[]
) as not_found
"#;

let deleted_split_ids: Vec<String> = sqlx::query_scalar(DELETE_SPLITS_QUERY)
let (num_found, num_deleted, not_deletable_ids, not_found_ids): (
i64,
i64,
Vec<String>,
Vec<String>,
) = sqlx::query_as(DELETE_SPLITS_QUERY)
.bind(request.index_uid())
.bind(&request.split_ids)
.fetch_all(&self.connection_pool)
.fetch_one(&self.connection_pool)
.await
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;

// Log if some splits were not deleted (either non-existent or not
// in MarkedForDeletion state). Delete is idempotent — we don't error
// for missing splits.
if deleted_split_ids.len() != request.split_ids.len() {
let not_deleted: Vec<String> = request
.split_ids
.iter()
.filter(|id| !deleted_split_ids.contains(id))
.cloned()
.collect();
if !not_deletable_ids.is_empty() {
let message = format!(
"splits `{}` are not deletable",
not_deletable_ids.join(", ")
);
let entity = EntityKind::Splits {
split_ids: not_deletable_ids,
};
return Err(MetastoreError::FailedPrecondition { entity, message });
}

if !not_deleted.is_empty() {
warn!(
index_uid = %request.index_uid(),
not_deleted = ?not_deleted,
"some metrics splits were not deleted (non-existent or not marked for deletion)"
);
}
if !not_found_ids.is_empty() {
warn!(
index_uid = %request.index_uid(),
not_found = ?not_found_ids,
"{} metrics splits were not found and could not be deleted",
not_found_ids.len()
);
}

let _ = (num_found, num_deleted); // used by the CTE logic

info!(
index_uid = %request.index_uid(),
deleted_count = deleted_split_ids.len(),
num_deleted,
"deleted metrics splits successfully"
);
Ok(EmptyResponse {})
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-parquet-engine/src/split/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ pub struct MetricsSplitMetadata {
/// 0 for newly ingested splits.
pub num_merge_ops: u32,

/// RowKeys (sort-key min/max boundaries) as proto bytes.
/// RowKeys (sort-key min/max boundaries) as serialized proto bytes
/// (`sortschema::RowKeys` in `event_store_sortschema.proto`).
/// None for pre-Phase-31 splits or splits without sort schema.
pub row_keys_proto: Option<Vec<u8>>,

Expand Down
Loading