Skip to content

Commit 97b978e

Browse files
committed
update
1 parent 5dc090c commit 97b978e

File tree

9 files changed

+527
-1
lines changed

9 files changed

+527
-1
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
use std::collections::HashMap;
2+
3+
use protocol::grpc::api::{FsEdge, FsNode};
4+
use tokio::sync::mpsc;
5+
6+
use crate::runtime::streaming::protocol::tracked::TrackedEvent;
7+
8+
pub struct EdgeManager {
9+
// PipelineID -> (输入 Receiver, 输出 Sender 列表)
10+
endpoints: HashMap<u32, (Option<mpsc::Receiver<TrackedEvent>>, Vec<mpsc::Sender<TrackedEvent>>)>,
11+
}
12+
13+
impl EdgeManager {
14+
pub fn build(nodes: &[FsNode], edges: &[FsEdge]) -> Self {
15+
let mut tx_map: HashMap<u32, Vec<mpsc::Sender<TrackedEvent>>> = HashMap::new();
16+
let mut rx_map: HashMap<u32, mpsc::Receiver<TrackedEvent>> = HashMap::new();
17+
18+
for edge in edges {
19+
let (tx, rx) = mpsc::channel(2048);
20+
tx_map.entry(edge.source as u32).or_default().push(tx);
21+
rx_map.insert(edge.target as u32, rx);
22+
}
23+
24+
let mut endpoints = HashMap::new();
25+
for node in nodes {
26+
let id = node.node_index as u32;
27+
endpoints.insert(id, (rx_map.remove(&id), tx_map.remove(&id).unwrap_or_default()));
28+
}
29+
30+
Self { endpoints }
31+
}
32+
33+
pub fn take_endpoints(
34+
&mut self,
35+
id: u32,
36+
) -> (Option<mpsc::Receiver<TrackedEvent>>, Vec<mpsc::Sender<TrackedEvent>>) {
37+
self.endpoints
38+
.remove(&id)
39+
.expect("Critical: Execution Graph Inconsistent")
40+
}
41+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, RwLock};
3+
4+
use protocol::grpc::api::{ChainedOperator, FsProgram};
5+
use tokio::sync::mpsc;
6+
use tracing::error;
7+
8+
use crate::runtime::streaming::api::operator::ConstructedOperator;
9+
use crate::runtime::streaming::factory::OperatorFactory;
10+
use crate::runtime::streaming::job::edge_manager::EdgeManager;
11+
use crate::runtime::streaming::job::models::{PhysicalExecutionGraph, PhysicalPipeline, PipelineStatus};
12+
use crate::runtime::streaming::job::pipeline_runner::{FusionOperatorChain, PipelineRunner};
13+
use crate::runtime::streaming::memory::MemoryPool;
14+
use crate::runtime::streaming::protocol::control::{ControlCommand, StopMode};
15+
use crate::runtime::streaming::storage::manager::TableManager;
16+
17+
pub struct JobManager {
18+
active_jobs: Arc<RwLock<HashMap<String, PhysicalExecutionGraph>>>,
19+
operator_factory: Arc<OperatorFactory>,
20+
memory_pool: Arc<MemoryPool>,
21+
table_manager: Option<Arc<tokio::sync::Mutex<TableManager>>>,
22+
}
23+
24+
impl JobManager {
25+
pub fn new(
26+
operator_factory: Arc<OperatorFactory>,
27+
max_memory_bytes: usize,
28+
table_manager: Option<Arc<tokio::sync::Mutex<TableManager>>>,
29+
) -> Self {
30+
Self {
31+
active_jobs: Arc::new(RwLock::new(HashMap::new())),
32+
operator_factory,
33+
memory_pool: MemoryPool::new(max_memory_bytes),
34+
table_manager,
35+
}
36+
}
37+
38+
/// 从逻辑计划点火物理线程
39+
pub async fn submit_job(&self, program: FsProgram) -> anyhow::Result<String> {
40+
let job_id = format!("job-{}", chrono::Utc::now().timestamp_millis());
41+
42+
let mut edge_manager = EdgeManager::build(&program.nodes, &program.edges);
43+
let mut physical_pipelines = HashMap::new();
44+
45+
for node in &program.nodes {
46+
let pipe_id = node.node_index as u32;
47+
let (inbox, outboxes) = edge_manager.take_endpoints(pipe_id);
48+
let chain = self.create_chain(&node.operators)?;
49+
let (ctrl_tx, ctrl_rx) = mpsc::channel(64);
50+
let status = Arc::new(RwLock::new(PipelineStatus::Initializing));
51+
52+
let thread_status = status.clone();
53+
let job_id_for_thread = job_id.clone();
54+
let exit_job_id = job_id_for_thread.clone();
55+
let registry_ptr = self.active_jobs.clone();
56+
let memory_pool = self.memory_pool.clone();
57+
let table_manager = self.table_manager.clone();
58+
59+
let handle = std::thread::Builder::new()
60+
.name(format!("Job-{}-Pipe-{}", job_id, pipe_id))
61+
.spawn(move || {
62+
{
63+
let mut st = thread_status.write().unwrap();
64+
*st = PipelineStatus::Running;
65+
}
66+
67+
let rt = tokio::runtime::Builder::new_current_thread()
68+
.enable_all()
69+
.build()
70+
.expect("build current thread runtime");
71+
72+
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
73+
rt.block_on(async move {
74+
let mut runner = PipelineRunner::new(
75+
pipe_id,
76+
chain,
77+
inbox,
78+
outboxes,
79+
ctrl_rx,
80+
job_id_for_thread.clone(),
81+
memory_pool,
82+
table_manager,
83+
);
84+
runner.run().await
85+
})
86+
}));
87+
88+
Self::on_pipeline_exit(exit_job_id, pipe_id, result, thread_status, registry_ptr);
89+
})?;
90+
91+
physical_pipelines.insert(
92+
pipe_id,
93+
PhysicalPipeline {
94+
pipeline_id: pipe_id,
95+
handle: Some(handle),
96+
status,
97+
control_tx: ctrl_tx,
98+
},
99+
);
100+
}
101+
102+
let graph = PhysicalExecutionGraph {
103+
job_id: job_id.clone(),
104+
program,
105+
pipelines: physical_pipelines,
106+
start_time: std::time::Instant::now(),
107+
};
108+
109+
self.active_jobs.write().unwrap().insert(job_id.clone(), graph);
110+
Ok(job_id)
111+
}
112+
113+
pub async fn stop_job(&self, job_id: &str, mode: StopMode) -> anyhow::Result<()> {
114+
let controllers = {
115+
let jobs = self.active_jobs.read().unwrap();
116+
let graph = jobs
117+
.get(job_id)
118+
.ok_or_else(|| anyhow::anyhow!("job not found: {job_id}"))?;
119+
graph
120+
.pipelines
121+
.values()
122+
.map(|p| p.control_tx.clone())
123+
.collect::<Vec<_>>()
124+
};
125+
126+
for tx in controllers {
127+
tx.send(ControlCommand::Stop { mode: mode.clone() }).await?;
128+
}
129+
Ok(())
130+
}
131+
132+
pub fn get_pipeline_statuses(&self, job_id: &str) -> Option<HashMap<u32, PipelineStatus>> {
133+
let jobs = self.active_jobs.read().unwrap();
134+
let graph = jobs.get(job_id)?;
135+
Some(
136+
graph
137+
.pipelines
138+
.iter()
139+
.map(|(id, pipeline)| (*id, pipeline.status.read().unwrap().clone()))
140+
.collect(),
141+
)
142+
}
143+
144+
fn create_chain(&self, operators: &[ChainedOperator]) -> anyhow::Result<FusionOperatorChain> {
145+
let mut chain = Vec::with_capacity(operators.len());
146+
for op in operators {
147+
match self
148+
.operator_factory
149+
.create_operator(&op.operator_name, &op.operator_config)?
150+
{
151+
ConstructedOperator::Operator(msg_op) => chain.push(msg_op),
152+
ConstructedOperator::Source(_) => {
153+
return Err(anyhow::anyhow!(
154+
"source operator '{}' cannot be used inside a physical pipeline chain",
155+
op.operator_name
156+
));
157+
}
158+
}
159+
}
160+
Ok(FusionOperatorChain::new(chain))
161+
}
162+
163+
fn on_pipeline_exit(
164+
job_id: String,
165+
pipe_id: u32,
166+
result: std::thread::Result<anyhow::Result<()>>,
167+
status: Arc<RwLock<PipelineStatus>>,
168+
_registry: Arc<RwLock<HashMap<String, PhysicalExecutionGraph>>>,
169+
) {
170+
let mut needs_abort = false;
171+
match result {
172+
Ok(Err(e)) => {
173+
*status.write().unwrap() = PipelineStatus::Failed {
174+
error: e.to_string(),
175+
is_panic: false,
176+
};
177+
needs_abort = true;
178+
}
179+
Err(_) => {
180+
*status.write().unwrap() = PipelineStatus::Failed {
181+
error: "panic".into(),
182+
is_panic: true,
183+
};
184+
needs_abort = true;
185+
}
186+
Ok(Ok(_)) => {
187+
*status.write().unwrap() = PipelineStatus::Finished;
188+
}
189+
}
190+
191+
if needs_abort {
192+
error!(
193+
"Pipeline {}-{} failed. Initiating Job Abort.",
194+
job_id, pipe_id
195+
);
196+
}
197+
}
198+
}

src/runtime/streaming/job/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pub mod edge_manager;
2+
pub mod job_manager;
3+
pub mod models;
4+
pub mod pipeline_runner;
5+
6+
pub use job_manager::JobManager;
7+
pub use models::{PhysicalExecutionGraph, PhysicalPipeline, PipelineStatus};
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, RwLock};
3+
use std::thread::JoinHandle;
4+
use std::time::Instant;
5+
6+
use protocol::grpc::api::FsProgram;
7+
use tokio::sync::mpsc;
8+
9+
use crate::runtime::streaming::protocol::control::ControlCommand;
10+
11+
/// 物理 Pipeline 的实时状态
12+
#[derive(Debug, Clone, PartialEq)]
13+
pub enum PipelineStatus {
14+
Initializing,
15+
Running,
16+
Failed { error: String, is_panic: bool },
17+
Finished,
18+
Stopping,
19+
}
20+
21+
/// 物理执行图中的一个执行单元
22+
pub struct PhysicalPipeline {
23+
pub pipeline_id: u32,
24+
pub handle: Option<JoinHandle<()>>,
25+
pub status: Arc<RwLock<PipelineStatus>>,
26+
pub control_tx: mpsc::Sender<ControlCommand>,
27+
}
28+
29+
/// 一个 SQL Job 的物理执行图
30+
pub struct PhysicalExecutionGraph {
31+
pub job_id: String,
32+
pub program: FsProgram,
33+
pub pipelines: HashMap<u32, PhysicalPipeline>,
34+
pub start_time: Instant,
35+
}

0 commit comments

Comments
 (0)