Skip to content
17 changes: 17 additions & 0 deletions src/connection/channels.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Connection channel bundling for actor construction.

use crate::push::{PushHandle, PushQueues};

/// Bundles push queues with their shared handle for actor construction.
pub struct ConnectionChannels<F> {
/// Receivers for high- and low-priority frames consumed by the actor.
pub queues: PushQueues<F>,
/// Handle cloned by producers to enqueue frames into the shared queues.
pub handle: PushHandle<F>,
}

impl<F> ConnectionChannels<F> {
/// Create a new bundle of push queues and their associated handle.
#[must_use]
pub fn new(queues: PushQueues<F>, handle: PushHandle<F>) -> Self { Self { queues, handle } }
}
32 changes: 32 additions & 0 deletions src/connection/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Active connection counting and RAII guard.

use std::sync::atomic::{AtomicU64, Ordering};

/// Global gauge tracking active connections.
static ACTIVE_CONNECTIONS: AtomicU64 = AtomicU64::new(0);

/// RAII guard incrementing [`ACTIVE_CONNECTIONS`] on creation and
/// decrementing it on drop.
pub(super) struct ActiveConnection;

impl ActiveConnection {
pub(super) fn new() -> Self {
ACTIVE_CONNECTIONS.fetch_add(1, Ordering::Relaxed);
crate::metrics::inc_connections();
Self
}
}

impl Drop for ActiveConnection {
fn drop(&mut self) {
ACTIVE_CONNECTIONS.fetch_sub(1, Ordering::Relaxed);
crate::metrics::dec_connections();
}
}

/// Return the current number of active connections.
#[must_use]
pub fn active_connection_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }

/// Load the current count for logging purposes.
pub(super) fn current_count() -> u64 { ACTIVE_CONNECTIONS.load(Ordering::Relaxed) }
35 changes: 25 additions & 10 deletions src/connection/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@

use tokio::sync::mpsc::error::TryRecvError;

use super::{
ConnectionActor,
DrainContext,
QueueKind,
multi_packet::MultiPacketTerminationReason,
state::ActorState,
};
use super::{ConnectionActor, multi_packet::MultiPacketTerminationReason, state::ActorState};
use crate::{app::Packet, correlation::CorrelatableFrame, push::FrameLike};

/// Context for drain operations containing mutable references to output and actor state.
pub(super) struct DrainContext<'a, F> {
pub(super) out: &'a mut Vec<F>,
pub(super) state: &'a mut ActorState,
}

/// Queue variants processed by the connection actor.
#[derive(Clone, Copy)]
pub(super) enum QueueKind {
High,
Low,
Multi,
}

impl<F, E> ConnectionActor<F, E>
where
F: FrameLike + CorrelatableFrame + Packet,
Expand All @@ -36,8 +44,12 @@ where
let DrainContext { out, state } = ctx;
match res {
Some(frame) => {
let is_stamping = self
.active_output
.multi_packet_mut()
.is_some_and(|ctx| ctx.is_stamping_enabled());
match kind {
QueueKind::Multi if self.multi_packet.is_stamping_enabled() => {
QueueKind::Multi if is_stamping => {
self.emit_multi_packet_frame(frame, out);
}
_ => {
Expand Down Expand Up @@ -144,8 +156,11 @@ where
}
}
QueueKind::Multi => {
let result = match self.multi_packet.channel_mut() {
Some(rx) => rx.try_recv(),
let result = match self.active_output.multi_packet_mut() {
Some(ctx) => match ctx.channel_mut() {
Some(rx) => rx.try_recv(),
None => return false,
},
None => return false,
};

Expand Down
8 changes: 6 additions & 2 deletions src/connection/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ where

/// Apply correlation stamping to a multi-packet frame.
pub(super) fn apply_multi_packet_correlation(&mut self, frame: &mut F) {
if !self.multi_packet.is_stamping_enabled() {
let Some(ctx) = self.active_output.multi_packet_mut() else {
// No channel is active, so there is nothing to stamp.
return;
};

if !ctx.is_stamping_enabled() {
return;
}

let correlation_id = self.multi_packet.correlation_id();
let correlation_id = ctx.correlation_id();
frame.set_correlation_id(correlation_id);

if let Some(expected) = correlation_id {
Expand Down
Loading
Loading