diff --git a/Cargo.lock b/Cargo.lock index 1748189..bcf2813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "serde", "serde_json", "tempfile", + "tokio", "uuid", ] @@ -948,6 +949,12 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + [[package]] name = "portable-atomic" version = "1.13.1" @@ -1473,6 +1480,15 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "pin-project-lite", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 8890f35..629fbb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ clap = { version = "4.6.0", features = ["derive"] } ratatui = "0.30.0" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" +tokio = { version = "1.48.0", features = ["rt", "sync", "time"] } uuid = { version = "1.22.0", features = ["v4", "serde"] } [dev-dependencies] diff --git a/src/cli/clean/render.rs b/src/cli/clean/render.rs index ca9b013..1971d0a 100644 --- a/src/cli/clean/render.rs +++ b/src/cli/clean/render.rs @@ -72,6 +72,16 @@ impl CleanAnimation { render_sections(&self.sections, true) } + pub fn tick(&mut self) -> bool { + let mut changed = false; + + for section in &mut self.sections { + changed |= tick_in_flight(&mut section.root); + } + + changed + } + pub fn prime_resume( &mut self, restacked_branches: &[RestackPreview], @@ -146,6 +156,16 @@ fn clear_in_flight(node: &mut VisualNode) { } } +fn tick_in_flight(node: &mut VisualNode) -> bool { + let mut changed = node.status.tick(); + + for child in &mut node.children { + changed |= tick_in_flight(child); + } + + changed +} + #[cfg(test)] mod tests { use super::CleanAnimation; @@ -306,4 +326,54 @@ mod tests { ) ); } + + #[test] + fn tick_advances_in_flight_throbber_without_changing_progress() { + let mut animation = CleanAnimation::new(&CleanPlan { + trunk_branch: "main".into(), + current_branch: "feat/auth".into(), + requested_branch_name: Some("feat/auth".into()), + candidates: vec![CleanCandidate { + node_id: Uuid::new_v4(), + branch_name: "feat/auth".into(), + parent_branch_name: "main".into(), + reason: CleanReason::IntegratedIntoParent { + parent_base: RestackBaseTarget::local("main"), + }, + tree: CleanTreeNode { + branch_name: "feat/auth".into(), + children: vec![CleanTreeNode { + branch_name: "feat/auth-api".into(), + children: vec![], + }], + }, + restack_plan: vec![], + depth: 0, + }], + blocked: vec![], + }); + + animation.apply_event(&CleanEvent::RebaseStarted { + branch_name: "feat/auth-api".into(), + onto_branch: "main".into(), + }); + animation.apply_event(&CleanEvent::RebaseProgress { + branch_name: "feat/auth-api".into(), + onto_branch: "main".into(), + current_commit: 2, + total_commits: 5, + }); + + let before = animation.render_active(); + + assert!(animation.tick()); + + let after = animation.render_active(); + + assert!(before.contains("\u{1b}[34m/\u{1b}[0m")); + assert!(after.contains("\u{1b}[34m-\u{1b}[0m")); + assert!(before.contains("[2/5]")); + assert!(after.contains("[2/5]")); + assert!(after.contains("feat/auth-api")); + } } diff --git a/src/cli/operation/mod.rs b/src/cli/operation/mod.rs index b57855f..b1083b1 100644 --- a/src/cli/operation/mod.rs +++ b/src/cli/operation/mod.rs @@ -135,6 +135,15 @@ impl BranchStatus { total_commits: Some(total_commits), } } + + pub fn tick(&mut self) -> bool { + let Self::InFlight { frame_index, .. } = self else { + return false; + }; + + *frame_index = (*frame_index + 1) % markers::THROBBER_FRAMES.len(); + true + } } pub fn render_sections(sections: &[OperationSection], final_view: bool) -> String { diff --git a/src/cli/sync/mod.rs b/src/cli/sync/mod.rs index 1f2b8e7..aee039e 100644 --- a/src/cli/sync/mod.rs +++ b/src/cli/sync/mod.rs @@ -2,16 +2,22 @@ mod render; use std::io; use std::io::IsTerminal; +use std::time::Duration; use clap::Args; +use tokio::runtime::{Builder, Runtime}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TryRecvError; +use tokio::time; use crate::core::clean; use crate::core::merge; +use crate::core::store::{PendingOperationKind, load_operation, open_initialized}; use crate::core::sync::{ self, RemotePushActionKind, RemotePushOutcome, SyncCompletion, SyncEvent, SyncOptions, SyncStage, }; -use crate::core::tree::{self, TreeOptions}; +use crate::core::tree::{self, TreeOptions, TreeView}; use super::CommandOutcome; use super::common; @@ -25,21 +31,18 @@ pub struct SyncArgs { pub fn execute(args: SyncArgs) -> io::Result { let animate = io::stdout().is_terminal(); - let mut animation = SyncAnimationSession::default(); - let outcome = if animate { - sync::run_with_reporter(&args.clone().into(), &mut |event| animation.apply(event))? + let runtime = if animate { + Some(build_animation_runtime()?) + } else { + None + }; + let outcome = if let Some(runtime) = runtime.as_ref() { + let initial_local_view = load_initial_local_sync_view(&args)?; + execute_sync_with_animation(runtime, args.clone(), initial_local_view)? } else { sync::run(&args.clone().into())? }; - if animate { - if outcome.status.success() { - animation.finish_success()?; - } else { - animation.finish_failure()?; - } - } - let mut final_status = outcome.status; if let Some(completion) = &outcome.completion { @@ -162,7 +165,12 @@ pub fn execute(args: SyncArgs) -> io::Result { let clean_outcome = if animate { println!("Finished local sync. Moving on to cleanup."); println!(); - execute_cleanup_with_animation(&full_outcome.cleanup_plan)? + execute_cleanup_with_animation( + runtime + .as_ref() + .expect("animation runtime should exist for TTY sync"), + &full_outcome.cleanup_plan, + )? } else { clean::apply(&full_outcome.cleanup_plan)? }; @@ -282,10 +290,39 @@ pub fn execute(args: SyncArgs) -> io::Result { }) } -#[derive(Default)] +fn build_animation_runtime() -> io::Result { + Builder::new_current_thread().enable_time().build() +} + +fn load_initial_local_sync_view(args: &SyncArgs) -> io::Result> { + if !args.continue_operation { + return Ok(Some(tree::run(&TreeOptions::default())?.view)); + } + + let session = open_initialized("dagger is not initialized; run 'dgr init' first")?; + let pending_operation = load_operation(&session.paths)?; + + if matches!( + pending_operation + .as_ref() + .map(|operation| &operation.origin), + Some(PendingOperationKind::Sync(_)) + ) { + Ok(Some(tree::run(&TreeOptions::default())?.view)) + } else { + Ok(None) + } +} + +enum WorkerMessage { + Event(Event), + Finished(io::Result), +} + struct SyncAnimationSession { terminal: Option, stage: Option, + initial_local_view: Option, } enum ActiveSyncStage { @@ -294,6 +331,14 @@ enum ActiveSyncStage { } impl SyncAnimationSession { + fn new(initial_local_view: Option) -> Self { + Self { + terminal: None, + stage: None, + initial_local_view, + } + } + fn apply(&mut self, event: SyncEvent) -> io::Result<()> { match event { SyncEvent::StageStarted(SyncStage::LocalSync { .. }) => { @@ -305,8 +350,11 @@ impl SyncAnimationSession { return Ok(()); } - let outcome = tree::run(&TreeOptions::default())?; - let mut animation = render::SyncAnimation::new(&outcome.view); + let Some(view) = self.initial_local_view.take() else { + return Ok(()); + }; + + let mut animation = render::SyncAnimation::new(&view); animation.apply_event(&event); let mut terminal = render::AnimationTerminal::start()?; terminal.render(&animation.render_active())?; @@ -359,6 +407,23 @@ impl SyncAnimationSession { } } + fn tick(&mut self) -> io::Result<()> { + let Some(stage) = self.stage.as_mut() else { + return Ok(()); + }; + + let changed = match stage { + ActiveSyncStage::Local(animation) => animation.tick(), + ActiveSyncStage::Cleanup(animation) => animation.tick(), + }; + + if changed { + self.render_active()?; + } + + Ok(()) + } + fn finish_success(&mut self) -> io::Result<()> { let Some(mut terminal) = self.terminal.take() else { return Ok(()); @@ -405,18 +470,96 @@ impl SyncAnimationSession { } } -fn execute_cleanup_with_animation(plan: &clean::CleanPlan) -> io::Result { - let mut animation = super::clean::render::CleanAnimation::new(plan); +fn execute_sync_with_animation( + runtime: &Runtime, + args: SyncArgs, + initial_local_view: Option, +) -> io::Result { + runtime.block_on(execute_sync_with_animation_async( + args.into(), + initial_local_view, + )) +} + +async fn execute_sync_with_animation_async( + options: SyncOptions, + initial_local_view: Option, +) -> io::Result { + let (sender, mut receiver) = mpsc::channel::>(64); + let worker = tokio::task::spawn_blocking(move || { + let outcome = sync::run_with_reporter(&options, &mut |event| { + let _ = sender.blocking_send(WorkerMessage::Event(event.clone())); + Ok(()) + }); + let _ = sender.blocking_send(WorkerMessage::Finished(outcome)); + }); + + let mut animation = SyncAnimationSession::new(initial_local_view); + let outcome = drive_sync_animation(&mut animation, &mut receiver).await; + let worker_result = worker.await; + + if let Err(err) = worker_result { + return Err(io::Error::other(err.to_string())); + } + + let outcome = outcome?; + if outcome.status.success() { + animation.finish_success()?; + } else { + animation.finish_failure()?; + } + + Ok(outcome) +} + +async fn drive_sync_animation( + animation: &mut SyncAnimationSession, + receiver: &mut mpsc::Receiver>, +) -> io::Result { + loop { + match time::timeout(Duration::from_millis(80), receiver.recv()).await { + Ok(Some(WorkerMessage::Event(event))) => animation.apply(event)?, + Ok(Some(WorkerMessage::Finished(outcome))) => { + return drain_worker_messages(receiver, |event| animation.apply(event), outcome); + } + Ok(None) => return Err(io::Error::other("sync animation worker ended unexpectedly")), + Err(_) => animation.tick()?, + } + } +} + +fn execute_cleanup_with_animation( + runtime: &Runtime, + plan: &clean::CleanPlan, +) -> io::Result { + runtime.block_on(execute_cleanup_with_animation_async(plan.clone())) +} + +async fn execute_cleanup_with_animation_async( + plan: clean::CleanPlan, +) -> io::Result { + let mut animation = super::clean::render::CleanAnimation::new(&plan); let mut terminal = render::AnimationTerminal::start()?; terminal.render(&animation.render_active())?; - let outcome = clean::apply_with_reporter(plan, &mut |event| { - if animation.apply_event(&event) { - terminal.render(&animation.render_active())?; - } + let (sender, mut receiver) = + mpsc::channel::>(64); + let worker = tokio::task::spawn_blocking(move || { + let outcome = clean::apply_with_reporter(&plan, &mut |event| { + let _ = sender.blocking_send(WorkerMessage::Event(event.clone())); + Ok(()) + }); + let _ = sender.blocking_send(WorkerMessage::Finished(outcome)); + }); - Ok(()) - })?; + let outcome = drive_cleanup_animation(&mut terminal, &mut animation, &mut receiver).await; + let worker_result = worker.await; + + if let Err(err) = worker_result { + return Err(io::Error::other(err.to_string())); + } + + let outcome = outcome?; if outcome.status.success() { terminal.finish(&animation.render_final())?; @@ -432,6 +575,59 @@ fn execute_cleanup_with_animation(plan: &clean::CleanPlan) -> io::Result>, +) -> io::Result { + loop { + match time::timeout(Duration::from_millis(80), receiver.recv()).await { + Ok(Some(WorkerMessage::Event(event))) => { + if animation.apply_event(&event) { + terminal.render(&animation.render_active())?; + } + } + Ok(Some(WorkerMessage::Finished(outcome))) => { + return drain_worker_messages( + receiver, + |event| { + if animation.apply_event(&event) { + terminal.render(&animation.render_active())?; + } + + Ok(()) + }, + outcome, + ); + } + Ok(None) => { + return Err(io::Error::other( + "cleanup animation worker ended unexpectedly", + )); + } + Err(_) => { + if animation.tick() { + terminal.render(&animation.render_active())?; + } + } + } + } +} + +fn drain_worker_messages( + receiver: &mut mpsc::Receiver>, + mut apply_event: impl FnMut(Event) -> io::Result<()>, + mut outcome: io::Result, +) -> io::Result { + loop { + match receiver.try_recv() { + Ok(WorkerMessage::Event(event)) => apply_event(event)?, + Ok(WorkerMessage::Finished(next_outcome)) => outcome = next_outcome, + Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => return outcome, + } + } +} + impl From for SyncOptions { fn from(args: SyncArgs) -> Self { Self { diff --git a/src/cli/sync/render.rs b/src/cli/sync/render.rs index 79e6802..f8a13dc 100644 --- a/src/cli/sync/render.rs +++ b/src/cli/sync/render.rs @@ -87,6 +87,16 @@ impl SyncAnimation { ) } + pub fn tick(&mut self) -> bool { + let mut changed = false; + + for root in &mut self.roots { + changed |= tick_in_flight(root); + } + + changed + } + fn prime_resume( &mut self, restacked_branches: &[RestackPreview], @@ -199,6 +209,15 @@ impl SyncBranchStatus { total_commits: Some(total_commits), } } + + fn tick(&mut self) -> bool { + let Self::InFlight { frame_index, .. } = self else { + return false; + }; + + *frame_index = (*frame_index + 1) % markers::THROBBER_FRAMES.len(); + true + } } fn visual_node_from_tree(node: &TreeNode) -> VisualTreeNode { @@ -220,6 +239,16 @@ fn clear_in_flight(node: &mut VisualTreeNode) { } } +fn tick_in_flight(node: &mut VisualTreeNode) -> bool { + let mut changed = node.status.tick(); + + for child in &mut node.children { + changed |= tick_in_flight(child); + } + + changed +} + fn prune_final_nodes(nodes: &[VisualTreeNode]) -> Vec { let mut pruned = Vec::new(); @@ -443,4 +472,35 @@ mod tests { ) ); } + + #[test] + fn tick_advances_in_flight_throbber_without_changing_progress() { + let mut animation = SyncAnimation::new(&sample_view()); + + animation.apply_event(&SyncEvent::StageStarted(SyncStage::LocalSync { + phase: PendingSyncPhase::RestackOutdatedLocalStacks, + step_branch_name: "feat/auth".into(), + active_branch_name: "feat/auth-api".into(), + deleted_branches: Vec::new(), + restacked_branches: Vec::new(), + })); + animation.apply_event(&SyncEvent::RestackProgress { + branch_name: "feat/auth-api".into(), + onto_branch: "feat/auth".into(), + current_commit: 2, + total_commits: 5, + }); + + let before = animation.render_active(); + + assert!(animation.tick()); + + let after = animation.render_active(); + + assert!(before.contains("\u{1b}[38;5;208m/\u{1b}[0m")); + assert!(after.contains("\u{1b}[38;5;208m-\u{1b}[0m")); + assert!(before.contains("[2/5]")); + assert!(after.contains("[2/5]")); + assert!(after.contains("\u{1b}[38;5;208mfeat/auth-api\u{1b}[0m")); + } } diff --git a/src/core/git.rs b/src/core/git.rs index 5be1336..b805f20 100644 --- a/src/core/git.rs +++ b/src/core/git.rs @@ -180,13 +180,46 @@ pub fn rebase_onto_with_progress( new_base: &str, old_upstream: &str, branch_name: &str, + on_progress: F, +) -> io::Result +where + F: FnMut(RebaseProgress) -> io::Result<()>, +{ + run_rebase_with_progress( + Command::new("git").args(["rebase", "--onto", new_base, old_upstream, branch_name]), + on_progress, + ) +} + +pub fn continue_rebase_with_progress(on_progress: F) -> io::Result +where + F: FnMut(RebaseProgress) -> io::Result<()>, +{ + run_rebase_with_progress( + Command::new("git") + .env("GIT_EDITOR", "true") + .args(["rebase", "--continue"]), + on_progress, + ) +} + +pub fn continue_rebase() -> io::Result { + let output = Command::new("git") + .env("GIT_EDITOR", "true") + .args(["rebase", "--continue"]) + .output()?; + + output_to_git_command_output(output) +} + +fn run_rebase_with_progress( + command: &mut Command, mut on_progress: F, ) -> io::Result where F: FnMut(RebaseProgress) -> io::Result<()>, { - let mut child = Command::new("git") - .args(["rebase", "--onto", new_base, old_upstream, branch_name]) + let mut child = command .stdout(Stdio::null()) .stderr(Stdio::piped()) .spawn()?; @@ -225,15 +258,6 @@ where }) } -pub fn continue_rebase() -> io::Result { - let output = Command::new("git") - .env("GIT_EDITOR", "true") - .args(["rebase", "--continue"]) - .output()?; - - output_to_git_command_output(output) -} - pub fn init_repository() -> io::Result { Command::new("git").args(["init", "--quiet"]).status() } @@ -656,9 +680,24 @@ mod tests { }; use std::env; use std::fs; + #[cfg(unix)] + use std::os::unix::fs::PermissionsExt; + #[cfg(unix)] + use std::path::Path; use std::path::PathBuf; + #[cfg(unix)] + use std::process::Command; use uuid::Uuid; + #[cfg(unix)] + fn write_executable(path: &Path, script: &str) { + fs::write(path, script).unwrap(); + + let mut permissions = fs::metadata(path).unwrap().permissions(); + permissions.set_mode(0o755); + fs::set_permissions(path, permissions).unwrap(); + } + #[test] fn reports_in_progress_rebase_state() { let repo_git_dir = env::temp_dir().join(format!("dgr-git-{}", Uuid::new_v4())); @@ -712,6 +751,47 @@ mod tests { ); } + #[cfg(unix)] + #[test] + fn parses_continue_rebase_progress_from_streamed_git_output() { + let temp_dir = env::temp_dir().join(format!("dgr-git-{}", Uuid::new_v4())); + fs::create_dir_all(&temp_dir).unwrap(); + + let git_path = temp_dir.join("git"); + write_executable( + &git_path, + "#!/bin/sh\nprintf 'Rebasing (1/3)\\r' >&2\nsleep 0.1\nprintf 'Rebasing (2/3)\\r' >&2\nsleep 0.1\nprintf 'Rebasing (2/3)\\rSuccessfully rebased\\n' >&2\nexit 0\n", + ); + + let mut seen = Vec::new(); + let output = super::run_rebase_with_progress( + Command::new(&git_path).args(["rebase", "--continue"]), + |progress| { + seen.push(progress); + Ok(()) + }, + ) + .unwrap(); + + assert!(output.status.success()); + assert_eq!( + seen, + vec![ + RebaseProgress { + current: 1, + total: 3, + }, + RebaseProgress { + current: 2, + total: 3, + }, + ] + ); + assert!(output.stderr.contains("Successfully rebased")); + + fs::remove_dir_all(temp_dir).unwrap(); + } + #[test] fn parses_commit_metadata_records_from_git_log_output() { assert_eq!( diff --git a/src/core/sync.rs b/src/core/sync.rs index 56f73fd..eb4a4df 100644 --- a/src/core/sync.rs +++ b/src/core/sync.rs @@ -201,18 +201,13 @@ where ))); } - let continue_output = git::continue_rebase()?; - if !continue_output.status.success() { - return Ok(SyncOutcome { - status: continue_output.status, - completion: None, - failure_output: Some(continue_output.combined_output()), - paused: true, - }); - } - match pending_operation.origin.clone() { - crate::core::store::PendingOperationKind::Commit(payload) => { + PendingOperationKind::Commit(payload) => { + let continue_output = git::continue_rebase()?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let outcome = commit::resume_after_sync(pending_operation, payload)?; let status = outcome.status; let failure_output = outcome.failure_output.clone(); @@ -224,7 +219,12 @@ where paused, }) } - crate::core::store::PendingOperationKind::Adopt(payload) => { + PendingOperationKind::Adopt(payload) => { + let continue_output = git::continue_rebase()?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let outcome = adopt::resume_after_sync(pending_operation, payload)?; let status = outcome.status; let failure_output = outcome.failure_output.clone(); @@ -236,7 +236,12 @@ where paused, }) } - crate::core::store::PendingOperationKind::Merge(payload) => { + PendingOperationKind::Merge(payload) => { + let continue_output = git::continue_rebase()?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let outcome = merge::resume_after_sync(pending_operation, payload)?; let status = outcome.outcome.status; let failure_output = outcome.outcome.failure_output.clone(); @@ -248,16 +253,24 @@ where paused, }) } - crate::core::store::PendingOperationKind::Clean(payload) => { + PendingOperationKind::Clean(payload) => { let mut restacked_branches = payload.restacked_branches.clone(); restacked_branches.extend(pending_operation.completed_branches().iter().cloned()); + let active_action = pending_operation.active_action().clone(); reporter(SyncEvent::StageStarted(SyncStage::CleanupResume { plan: clean::plan_for_resume(&payload)?, - active_branch_name: pending_operation.active_action().branch_name.clone(), + active_branch_name: active_action.branch_name.clone(), untracked_branches: payload.untracked_branches.clone(), deleted_branches: payload.deleted_branches.clone(), restacked_branches, }))?; + let continue_output = git::continue_rebase_with_progress(|progress| { + report_cleanup_continue_progress(reporter, &active_action, progress) + })?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let trunk_branch = payload.trunk_branch.clone(); let outcome = clean::resume_after_sync_with_reporter(pending_operation, payload, &mut |event| { @@ -276,7 +289,12 @@ where paused, }) } - crate::core::store::PendingOperationKind::Orphan(payload) => { + PendingOperationKind::Orphan(payload) => { + let continue_output = git::continue_rebase()?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let outcome = orphan::resume_after_sync(pending_operation, payload)?; let status = outcome.status; let failure_output = outcome.failure_output.clone(); @@ -288,7 +306,12 @@ where paused, }) } - crate::core::store::PendingOperationKind::Reparent(payload) => { + PendingOperationKind::Reparent(payload) => { + let continue_output = git::continue_rebase()?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + let outcome = reparent::resume_after_sync(pending_operation, payload)?; let status = outcome.status; let failure_output = outcome.failure_output.clone(); @@ -300,13 +323,32 @@ where paused, }) } - crate::core::store::PendingOperationKind::Sync(payload) => { - let outcome = resume_full_sync_with_reporter(pending_operation, payload, reporter)?; + PendingOperationKind::Sync(payload) => { + let active_action = pending_operation.active_action().clone(); + report_resumed_full_sync_stage_started(reporter, &pending_operation, &payload)?; + let continue_output = git::continue_rebase_with_progress(|progress| { + report_sync_continue_progress(reporter, &active_action, progress) + })?; + if !continue_output.status.success() { + return Ok(paused_continue_outcome(continue_output)); + } + + let outcome = + resume_full_sync_with_reporter(pending_operation, payload, reporter, false)?; finalize_full_sync_outcome(outcome) } } } +fn paused_continue_outcome(continue_output: git::GitCommandOutput) -> SyncOutcome { + SyncOutcome { + status: continue_output.status, + completion: None, + failure_output: Some(continue_output.combined_output()), + paused: true, + } +} + fn run_full_sync_with_reporter(reporter: &mut F) -> io::Result where F: FnMut(SyncEvent) -> io::Result<()>, @@ -343,27 +385,22 @@ fn resume_full_sync_with_reporter( pending_operation: PendingOperationState, payload: PendingSyncOperation, reporter: &mut F, + emit_stage_started: bool, ) -> io::Result where F: FnMut(SyncEvent) -> io::Result<()>, { let mut session = open_initialized("dagger is not initialized; run 'dgr init' first")?; clean::reconcile_branch_divergence_state(&mut session)?; + if emit_stage_started { + report_resumed_full_sync_stage_started(reporter, &pending_operation, &payload)?; + } + let mut progress = LocalSyncProgress { repaired_pull_requests: Vec::new(), deleted_branches: payload.deleted_branches, restacked_branches: payload.restacked_branches, }; - let mut resumed_restacked_branches = progress.restacked_branches.clone(); - resumed_restacked_branches.extend(pending_operation.completed_branches().iter().cloned()); - - reporter(SyncEvent::StageStarted(SyncStage::LocalSync { - phase: payload.phase, - step_branch_name: payload.step_branch_name.clone(), - active_branch_name: pending_operation.active_action().branch_name.clone(), - deleted_branches: progress.deleted_branches.clone(), - restacked_branches: resumed_restacked_branches, - }))?; let restack_outcome = workflow::continue_resumable_restack_operation( &mut session, @@ -395,6 +432,58 @@ where ) } +fn report_resumed_full_sync_stage_started( + reporter: &mut F, + pending_operation: &PendingOperationState, + payload: &PendingSyncOperation, +) -> io::Result<()> +where + F: FnMut(SyncEvent) -> io::Result<()>, +{ + let mut resumed_restacked_branches = payload.restacked_branches.clone(); + resumed_restacked_branches.extend(pending_operation.completed_branches().iter().cloned()); + + reporter(SyncEvent::StageStarted(SyncStage::LocalSync { + phase: payload.phase, + step_branch_name: payload.step_branch_name.clone(), + active_branch_name: pending_operation.active_action().branch_name.clone(), + deleted_branches: payload.deleted_branches.clone(), + restacked_branches: resumed_restacked_branches, + })) +} + +fn report_sync_continue_progress( + reporter: &mut F, + action: &RestackAction, + progress: git::RebaseProgress, +) -> io::Result<()> +where + F: FnMut(SyncEvent) -> io::Result<()>, +{ + reporter(SyncEvent::RestackProgress { + branch_name: action.branch_name.clone(), + onto_branch: action.new_base.branch_name.clone(), + current_commit: progress.current, + total_commits: progress.total, + }) +} + +fn report_cleanup_continue_progress( + reporter: &mut F, + action: &RestackAction, + progress: git::RebaseProgress, +) -> io::Result<()> +where + F: FnMut(SyncEvent) -> io::Result<()>, +{ + reporter(SyncEvent::Cleanup(clean::CleanEvent::RebaseProgress { + branch_name: action.branch_name.clone(), + onto_branch: action.new_base.branch_name.clone(), + current_commit: progress.current, + total_commits: progress.total, + })) +} + fn finalize_full_sync_outcome(outcome: LocalSyncOutcome) -> io::Result { if outcome.paused || !outcome.status.success() { return Ok(SyncOutcome {