diff --git a/crates/modelardb_storage/src/write_ahead_log.rs b/crates/modelardb_storage/src/write_ahead_log.rs index 78e71471..f01732e8 100644 --- a/crates/modelardb_storage/src/write_ahead_log.rs +++ b/crates/modelardb_storage/src/write_ahead_log.rs @@ -290,14 +290,19 @@ impl SegmentedLog { fn try_new(folder_path: PathBuf, schema: &Schema) -> Result { std::fs::create_dir_all(&folder_path)?; - close_leftover_active_segment(&folder_path)?; + let leftover_next_id = close_leftover_active_segment(&folder_path)?; // Collect all closed segment files already on disk and sort them by start_id. let mut closed_segments = find_closed_segments(&folder_path)?; closed_segments.sort_by_key(|s| s.start_id); - // The next batch id is one past the end of the last closed segment, or 0 if there are none. - let next_id = closed_segments.last().map(|s| s.end_id + 1).unwrap_or(0); + // The next batch id is one past the end of the last closed segment, the next id + // recovered from a leftover active segment, or 0 if there is neither. + let next_id = closed_segments + .last() + .map(|s| s.end_id + 1) + .or(leftover_next_id) + .unwrap_or(0); if !closed_segments.is_empty() { debug!( @@ -532,9 +537,10 @@ impl SegmentedLog { /// If a leftover active segment (`{start_id}-.arrows`) exists in `folder_path`, rename it to /// its final `{start_id}-{end_id}.arrows` name so it is picked up as a closed segment. If the -/// file contains no batches, it is removed instead. If the file could not be renamed or -/// removed, return [`ModelarDbStorageError`]. -fn close_leftover_active_segment(folder_path: &Path) -> Result<()> { +/// file contains no batches, it is removed instead. In both cases, the next batch id implied by +/// the leftover is returned so the caller can ensure batch id continuity. If no leftover exists, +/// return `None`. If the file could not be renamed or removed, return [`ModelarDbStorageError`]. +fn close_leftover_active_segment(folder_path: &Path) -> Result> { let Some(active_path) = std::fs::read_dir(folder_path)? .filter_map(|maybe_entry| maybe_entry.ok()) .map(|entry| entry.path()) @@ -544,7 +550,7 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> { .is_some_and(|stem| stem.ends_with('-')) }) else { - return Ok(()); + return Ok(None); }; let stem = active_path @@ -561,6 +567,7 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> { if batches.is_empty() { std::fs::remove_file(&active_path)?; debug!(path = %active_path.display(), "Removed empty leftover active WAL segment."); + Ok(Some(start_id)) } else { let end_id = start_id + batches.len() as u64 - 1; let closed_path = folder_path.join(format!("{start_id}-{end_id}.arrows")); @@ -573,9 +580,8 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> { ); std::fs::rename(&active_path, closed_path)?; + Ok(Some(end_id + 1)) } - - Ok(()) } /// Collect all closed segment files in `folder_path`. Closed segments have names of the form @@ -1084,6 +1090,38 @@ mod tests { assert_eq!(file_count, 1); } + #[test] + fn test_reopen_with_empty_leftover_after_all_segments_persisted_preserves_batch_id() { + let temp_dir = tempfile::tempdir().unwrap(); + let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME); + let metadata = table::time_series_table_metadata(); + + let batch = table::uncompressed_time_series_table_record_batch(5); + + // Write enough batches to close two segments, persist all, then drop. + { + let segmented_log = + SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap(); + + for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD * 2 { + segmented_log.append_and_sync(&batch).unwrap(); + } + + let ids: HashSet = (0..SEGMENT_BATCH_COUNT_THRESHOLD * 2).collect(); + segmented_log.mark_batches_as_persisted(ids).unwrap(); + + // Closed segments are deleted. Only the empty active segment remains. + assert!(segmented_log.closed_segments.lock().unwrap().is_empty()); + assert!(segmented_log.all_batches().unwrap().is_empty()); + } + + // On re-open, next_batch_id must continue from where it left off. + let segmented_log = SegmentedLog::try_new(folder_path, &metadata.schema).unwrap(); + + let active = segmented_log.active_segment.lock().unwrap(); + assert_eq!(active.next_batch_id, SEGMENT_BATCH_COUNT_THRESHOLD * 2); + } + #[test] fn test_mark_batches_as_persisted_deletes_fully_persisted_segment() { let temp_dir = tempfile::tempdir().unwrap();