Skip to content

Commit 723c4b2

Browse files
committed
update
1 parent eb57e1b commit 723c4b2

File tree

2 files changed

+91
-37
lines changed

2 files changed

+91
-37
lines changed

src/runtime/streaming/job/edge_manager.rs

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212

1313
use std::collections::HashMap;
1414

15-
use protocol::grpc::api::{FsEdge, FsNode};
15+
use anyhow::{Result, anyhow};
1616
use tokio::sync::mpsc;
17+
use tracing::{debug, info, warn};
1718

1819
use crate::runtime::streaming::protocol::event::TrackedEvent;
20+
use protocol::grpc::api::{FsEdge, FsNode};
21+
22+
const DEFAULT_CHANNEL_CAPACITY: usize = 2048;
1923

2024
type TrackedEventEndpoints = (
2125
Vec<mpsc::Receiver<TrackedEvent>>,
@@ -28,34 +32,69 @@ pub struct EdgeManager {
2832

2933
impl EdgeManager {
3034
pub fn build(nodes: &[FsNode], edges: &[FsEdge]) -> Self {
31-
let mut tx_map: HashMap<u32, Vec<mpsc::Sender<TrackedEvent>>> = HashMap::new();
32-
let mut rx_map: HashMap<u32, Vec<mpsc::Receiver<TrackedEvent>>> = HashMap::new();
35+
Self::build_with_capacity(nodes, edges, DEFAULT_CHANNEL_CAPACITY)
36+
}
37+
38+
pub fn build_with_capacity(nodes: &[FsNode], edges: &[FsEdge], capacity: usize) -> Self {
39+
info!(
40+
"Building EdgeManager for {} nodes and {} edges (channel capacity: {})",
41+
nodes.len(),
42+
edges.len(),
43+
capacity
44+
);
45+
46+
let mut tx_map: HashMap<u32, Vec<mpsc::Sender<TrackedEvent>>> =
47+
HashMap::with_capacity(nodes.len());
48+
let mut rx_map: HashMap<u32, Vec<mpsc::Receiver<TrackedEvent>>> =
49+
HashMap::with_capacity(nodes.len());
3350

3451
for edge in edges {
35-
let (tx, rx) = mpsc::channel(2048);
36-
tx_map.entry(edge.source as u32).or_default().push(tx);
37-
rx_map.entry(edge.target as u32).or_default().push(rx);
52+
let source_id = edge.source as u32;
53+
let target_id = edge.target as u32;
54+
55+
let (tx, rx) = mpsc::channel(capacity);
56+
57+
tx_map.entry(source_id).or_default().push(tx);
58+
rx_map.entry(target_id).or_default().push(rx);
59+
60+
debug!(
61+
"Created physical edge channel: Node {} -> Node {}",
62+
source_id, target_id
63+
);
3864
}
3965

40-
let mut endpoints = HashMap::new();
66+
let mut endpoints = HashMap::with_capacity(nodes.len());
4167
for node in nodes {
4268
let id = node.node_index as u32;
69+
4370
let inboxes = rx_map.remove(&id).unwrap_or_default();
44-
endpoints.insert(id, (inboxes, tx_map.remove(&id).unwrap_or_default()));
71+
let outboxes = tx_map.remove(&id).unwrap_or_default();
72+
73+
endpoints.insert(id, (inboxes, outboxes));
74+
}
75+
76+
for remaining_target in rx_map.keys() {
77+
warn!(
78+
"Topology Warning: Found incoming edges pointing to non-existent Node {}",
79+
remaining_target
80+
);
81+
}
82+
for remaining_source in tx_map.keys() {
83+
warn!(
84+
"Topology Warning: Found outgoing edges coming from non-existent Node {}",
85+
remaining_source
86+
);
4587
}
4688

4789
Self { endpoints }
4890
}
4991

50-
pub fn take_endpoints(
51-
&mut self,
52-
id: u32,
53-
) -> (
54-
Vec<mpsc::Receiver<TrackedEvent>>,
55-
Vec<mpsc::Sender<TrackedEvent>>,
56-
) {
92+
pub fn take_endpoints(&mut self, id: u32) -> Result<TrackedEventEndpoints> {
5793
self.endpoints
5894
.remove(&id)
59-
.expect("Critical: Execution Graph Inconsistent")
95+
.ok_or_else(|| anyhow!(
96+
"Topology Error: Endpoints for Node {} not found or already taken. Execution Graph may be inconsistent.",
97+
id
98+
))
6099
}
61100
}

src/runtime/streaming/job/job_manager.rs

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
use std::collections::HashMap;
1414
use std::sync::{Arc, OnceLock, RwLock};
1515

16-
use anyhow::{Context, Result, anyhow, bail};
16+
use anyhow::{Context, Result, anyhow, bail, ensure};
1717
use tokio::sync::mpsc;
1818
use tokio_stream::wrappers::ReceiverStream;
1919
use tracing::{error, info, warn};
@@ -330,40 +330,52 @@ impl JobManager {
330330
operators: &[ChainedOperator],
331331
edge_manager: &mut EdgeManager,
332332
) -> Result<PhysicalPipeline> {
333-
let (raw_inboxes, raw_outboxes) = edge_manager.take_endpoints(pipeline_id);
333+
let (raw_inboxes, raw_outboxes) =
334+
edge_manager.take_endpoints(pipeline_id).with_context(|| {
335+
format!(
336+
"Failed to retrieve network endpoints for pipeline {}",
337+
pipeline_id
338+
)
339+
})?;
334340

335-
let physical_outboxes = raw_outboxes
341+
let physical_outboxes: Vec<PhysicalSender> = raw_outboxes
336342
.into_iter()
337343
.map(PhysicalSender::Local)
338344
.collect();
345+
339346
let physical_inboxes: Vec<BoxedEventStream> = raw_inboxes
340347
.into_iter()
341348
.map(|rx| Box::pin(ReceiverStream::new(rx)) as _)
342349
.collect();
343350

344-
let chain = self.build_operator_chain(operators)?;
345-
346-
if chain.source.is_none() && physical_inboxes.is_empty() {
347-
bail!(
348-
"Topology Error: pipeline '{}' contains no source and no upstream inputs.",
349-
pipeline_id
350-
);
351-
}
352-
if chain.source.is_some() && !physical_inboxes.is_empty() {
353-
bail!(
354-
"Topology Error: source pipeline '{}' should not have upstream inputs.",
351+
let chain = self.build_operator_chain(operators).with_context(|| {
352+
format!(
353+
"Failed to build operator chain for pipeline {}",
355354
pipeline_id
356-
);
357-
}
355+
)
356+
})?;
357+
358+
ensure!(
359+
chain.source.is_some() || !physical_inboxes.is_empty(),
360+
"Topology Error: Pipeline '{}' contains no source and has no upstream inputs (Dead end).",
361+
pipeline_id
362+
);
363+
ensure!(
364+
chain.source.is_none() || physical_inboxes.is_empty(),
365+
"Topology Error: Source pipeline '{}' cannot have upstream inputs.",
366+
pipeline_id
367+
);
358368

359369
let (control_tx, control_rx) = mpsc::channel(64);
360370
let status = Arc::new(RwLock::new(PipelineStatus::Initializing));
361371

372+
let subtask_index = 0;
373+
let parallelism = 1;
362374
let ctx = TaskContext::new(
363375
job_id.clone(),
364376
pipeline_id,
365-
0,
366-
1,
377+
subtask_index,
378+
parallelism,
367379
physical_outboxes,
368380
Arc::clone(&self.memory_pool),
369381
);
@@ -373,12 +385,15 @@ impl JobManager {
373385
PipelineRunner::Source(SourceDriver::new(source, chain_head, ctx, control_rx))
374386
} else {
375387
PipelineRunner::Standard(
376-
Pipeline::new(chain.operators, ctx, physical_inboxes, control_rx)
377-
.map_err(|e| anyhow!("Pipeline init failed: {e}"))?,
388+
Pipeline::new(chain.operators, ctx, physical_inboxes, control_rx).with_context(
389+
|| format!("Failed to initialize Standard Pipeline {}", pipeline_id),
390+
)?,
378391
)
379392
};
380393

381-
let handle = self.spawn_worker_thread(job_id, pipeline_id, runner, Arc::clone(&status))?;
394+
let handle = self
395+
.spawn_worker_thread(job_id, pipeline_id, runner, Arc::clone(&status))
396+
.with_context(|| format!("Failed to spawn OS thread for pipeline {}", pipeline_id))?;
382397

383398
Ok(PhysicalPipeline {
384399
pipeline_id,

0 commit comments

Comments
 (0)