Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions crates/modelardb_storage/src/write_ahead_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,19 @@ impl SegmentedLog {
fn try_new(folder_path: PathBuf, schema: &Schema) -> Result<Self> {
std::fs::create_dir_all(&folder_path)?;

close_leftover_active_segment(&folder_path)?;
let leftover_next_id = close_leftover_active_segment(&folder_path)?;

// Collect all closed segment files already on disk and sort them by start_id.
let mut closed_segments = find_closed_segments(&folder_path)?;
closed_segments.sort_by_key(|s| s.start_id);

// The next batch id is one past the end of the last closed segment, or 0 if there are none.
let next_id = closed_segments.last().map(|s| s.end_id + 1).unwrap_or(0);
// The next batch id is one past the end of the last closed segment, the next id
// recovered from a leftover active segment, or 0 if there is neither.
let next_id = closed_segments
.last()
.map(|s| s.end_id + 1)
.or(leftover_next_id)
.unwrap_or(0);

if !closed_segments.is_empty() {
debug!(
Expand Down Expand Up @@ -532,9 +537,10 @@ impl SegmentedLog {

/// If a leftover active segment (`{start_id}-.arrows`) exists in `folder_path`, rename it to
/// its final `{start_id}-{end_id}.arrows` name so it is picked up as a closed segment. If the
/// file contains no batches, it is removed instead. If the file could not be renamed or
/// removed, return [`ModelarDbStorageError`].
fn close_leftover_active_segment(folder_path: &Path) -> Result<()> {
/// file contains no batches, it is removed instead. In both cases, the next batch id implied by
/// the leftover is returned so the caller can ensure batch id continuity. If no leftover exists,
/// return `None`. If the file could not be renamed or removed, return [`ModelarDbStorageError`].
fn close_leftover_active_segment(folder_path: &Path) -> Result<Option<u64>> {
let Some(active_path) = std::fs::read_dir(folder_path)?
.filter_map(|maybe_entry| maybe_entry.ok())
.map(|entry| entry.path())
Expand All @@ -544,7 +550,7 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> {
.is_some_and(|stem| stem.ends_with('-'))
})
else {
return Ok(());
return Ok(None);
};

let stem = active_path
Expand All @@ -561,6 +567,7 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> {
if batches.is_empty() {
std::fs::remove_file(&active_path)?;
debug!(path = %active_path.display(), "Removed empty leftover active WAL segment.");
Ok(Some(start_id))
} else {
let end_id = start_id + batches.len() as u64 - 1;
let closed_path = folder_path.join(format!("{start_id}-{end_id}.arrows"));
Expand All @@ -573,9 +580,8 @@ fn close_leftover_active_segment(folder_path: &Path) -> Result<()> {
);

std::fs::rename(&active_path, closed_path)?;
Ok(Some(end_id + 1))
}

Ok(())
}

/// Collect all closed segment files in `folder_path`. Closed segments have names of the form
Expand Down Expand Up @@ -1084,6 +1090,38 @@ mod tests {
assert_eq!(file_count, 1);
}

#[test]
fn test_reopen_with_empty_leftover_after_all_segments_persisted_preserves_batch_id() {
let temp_dir = tempfile::tempdir().unwrap();
let folder_path = temp_dir.path().join(TIME_SERIES_TABLE_NAME);
let metadata = table::time_series_table_metadata();

let batch = table::uncompressed_time_series_table_record_batch(5);

// Write enough batches to close two segments, persist all, then drop.
{
let segmented_log =
SegmentedLog::try_new(folder_path.clone(), &metadata.schema).unwrap();

for _ in 0..SEGMENT_BATCH_COUNT_THRESHOLD * 2 {
segmented_log.append_and_sync(&batch).unwrap();
}

let ids: HashSet<u64> = (0..SEGMENT_BATCH_COUNT_THRESHOLD * 2).collect();
segmented_log.mark_batches_as_persisted(ids).unwrap();

// Closed segments are deleted. Only the empty active segment remains.
assert!(segmented_log.closed_segments.lock().unwrap().is_empty());
assert!(segmented_log.all_batches().unwrap().is_empty());
}

// On re-open, next_batch_id must continue from where it left off.
let segmented_log = SegmentedLog::try_new(folder_path, &metadata.schema).unwrap();

let active = segmented_log.active_segment.lock().unwrap();
assert_eq!(active.next_batch_id, SEGMENT_BATCH_COUNT_THRESHOLD * 2);
}

#[test]
fn test_mark_batches_as_persisted_deletes_fully_persisted_segment() {
let temp_dir = tempfile::tempdir().unwrap();
Expand Down
Loading