Skip to content

Implement session management and limits #142

@inureyes

Description

@inureyes

Summary

Implement session management features including per-user session limits, idle timeout, and session tracking.

Parent Epic

Implementation Details

1. Session Manager

// src/server/session.rs
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use uuid::Uuid;

/// Session manager for tracking and limiting connections
pub struct SessionManager {
    /// Active sessions by ID
    sessions: RwLock<HashMap<SessionId, SessionInfo>>,
    /// Sessions by user (for per-user limits)
    user_sessions: RwLock<HashMap<String, Vec<SessionId>>>,
    /// Configuration
    config: SessionConfig,
}

#[derive(Debug, Clone)]
pub struct SessionConfig {
    /// Maximum sessions per user
    pub max_sessions_per_user: usize,
    /// Maximum total sessions
    pub max_total_sessions: usize,
    /// Idle timeout (disconnect if no activity)
    pub idle_timeout: Duration,
    /// Session timeout (maximum session duration)
    pub session_timeout: Option<Duration>,
}

impl Default for SessionConfig {
    fn default() -> Self {
        Self {
            max_sessions_per_user: 10,
            max_total_sessions: 1000,
            idle_timeout: Duration::from_secs(3600),
            session_timeout: None,
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SessionId(Uuid);

impl SessionId {
    pub fn new() -> Self {
        Self(Uuid::new_v4())
    }
}

impl std::fmt::Display for SessionId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

#[derive(Debug, Clone)]
pub struct SessionInfo {
    pub id: SessionId,
    pub user: Option<String>,
    pub peer_addr: SocketAddr,
    pub started_at: Instant,
    pub last_activity: Instant,
    pub authenticated: bool,
    pub channels: Vec<ChannelInfo>,
}

#[derive(Debug, Clone)]
pub struct ChannelInfo {
    pub id: u32,
    pub channel_type: ChannelType,
    pub opened_at: Instant,
}

#[derive(Debug, Clone)]
pub enum ChannelType {
    Session,
    DirectTcpIp { host: String, port: u16 },
    ForwardedTcpIp { host: String, port: u16 },
}

impl SessionManager {
    pub fn new(config: SessionConfig) -> Self {
        Self {
            sessions: RwLock::new(HashMap::new()),
            user_sessions: RwLock::new(HashMap::new()),
            config,
        }
    }

    /// Create a new session
    pub async fn create_session(&self, peer_addr: SocketAddr) -> Result<SessionId, SessionError> {
        let sessions = self.sessions.read().await;

        // Check total session limit
        if sessions.len() >= self.config.max_total_sessions {
            return Err(SessionError::TooManySessions);
        }
        drop(sessions);

        let id = SessionId::new();
        let now = Instant::now();

        let info = SessionInfo {
            id,
            user: None,
            peer_addr,
            started_at: now,
            last_activity: now,
            authenticated: false,
            channels: Vec::new(),
        };

        let mut sessions = self.sessions.write().await;
        sessions.insert(id, info);

        tracing::debug!("Session created: {} from {}", id, peer_addr);
        Ok(id)
    }

    /// Mark session as authenticated for user
    pub async fn authenticate_session(
        &self,
        session_id: SessionId,
        user: &str,
    ) -> Result<(), SessionError> {
        // Check per-user limit
        {
            let user_sessions = self.user_sessions.read().await;
            if let Some(sessions) = user_sessions.get(user) {
                if sessions.len() >= self.config.max_sessions_per_user {
                    return Err(SessionError::TooManyUserSessions(user.to_string()));
                }
            }
        }

        // Update session
        {
            let mut sessions = self.sessions.write().await;
            if let Some(session) = sessions.get_mut(&session_id) {
                session.user = Some(user.to_string());
                session.authenticated = true;
                session.last_activity = Instant::now();
            } else {
                return Err(SessionError::SessionNotFound);
            }
        }

        // Track user session
        {
            let mut user_sessions = self.user_sessions.write().await;
            user_sessions
                .entry(user.to_string())
                .or_default()
                .push(session_id);
        }

        tracing::info!("Session {} authenticated for user {}", session_id, user);
        Ok(())
    }

    /// Update session activity timestamp
    pub async fn touch(&self, session_id: SessionId) {
        let mut sessions = self.sessions.write().await;
        if let Some(session) = sessions.get_mut(&session_id) {
            session.last_activity = Instant::now();
        }
    }

    /// Remove a session
    pub async fn remove_session(&self, session_id: SessionId) {
        let session = {
            let mut sessions = self.sessions.write().await;
            sessions.remove(&session_id)
        };

        if let Some(session) = session {
            if let Some(user) = session.user {
                let mut user_sessions = self.user_sessions.write().await;
                if let Some(sessions) = user_sessions.get_mut(&user) {
                    sessions.retain(|id| *id != session_id);
                }
            }
            tracing::debug!("Session removed: {}", session_id);
        }
    }

    /// Get sessions that should be timed out
    pub async fn get_idle_sessions(&self) -> Vec<SessionId> {
        let now = Instant::now();
        let sessions = self.sessions.read().await;

        sessions
            .iter()
            .filter_map(|(id, info)| {
                // Check idle timeout
                if now.duration_since(info.last_activity) > self.config.idle_timeout {
                    return Some(*id);
                }
                // Check session timeout
                if let Some(max_duration) = self.config.session_timeout {
                    if now.duration_since(info.started_at) > max_duration {
                        return Some(*id);
                    }
                }
                None
            })
            .collect()
    }

    /// Get current session statistics
    pub async fn get_stats(&self) -> SessionStats {
        let sessions = self.sessions.read().await;
        let user_sessions = self.user_sessions.read().await;

        SessionStats {
            total_sessions: sessions.len(),
            authenticated_sessions: sessions.values().filter(|s| s.authenticated).count(),
            unique_users: user_sessions.len(),
        }
    }

    /// Get all active sessions (for admin)
    pub async fn list_sessions(&self) -> Vec<SessionInfo> {
        let sessions = self.sessions.read().await;
        sessions.values().cloned().collect()
    }

    /// Force disconnect a session
    pub async fn kill_session(&self, session_id: SessionId) -> bool {
        let existed = self.sessions.read().await.contains_key(&session_id);
        if existed {
            self.remove_session(session_id).await;
            tracing::info!("Session {} killed by admin", session_id);
        }
        existed
    }
}

#[derive(Debug)]
pub struct SessionStats {
    pub total_sessions: usize,
    pub authenticated_sessions: usize,
    pub unique_users: usize,
}

#[derive(Debug, thiserror::Error)]
pub enum SessionError {
    #[error("Too many concurrent sessions")]
    TooManySessions,
    #[error("Too many sessions for user: {0}")]
    TooManyUserSessions(String),
    #[error("Session not found")]
    SessionNotFound,
}

2. Idle Timeout Checker

/// Background task to check and terminate idle sessions
pub async fn idle_session_checker(
    session_manager: Arc<SessionManager>,
    mut shutdown: tokio::sync::broadcast::Receiver<()>,
) {
    let mut interval = tokio::time::interval(Duration::from_secs(60));

    loop {
        tokio::select! {
            _ = interval.tick() => {
                let idle_sessions = session_manager.get_idle_sessions().await;
                for session_id in idle_sessions {
                    tracing::info!("Terminating idle session: {}", session_id);
                    session_manager.remove_session(session_id).await;
                    // TODO: Actually close the SSH connection
                }
            }
            _ = shutdown.recv() => {
                break;
            }
        }
    }
}

Configuration

security:
  max_sessions_per_user: 10
  max_total_sessions: 1000
  idle_timeout: 3600        # seconds
  session_timeout: 86400    # seconds (optional, max session duration)

Files to Create/Modify

File Action
src/server/session.rs Create - Session manager
src/server/mod.rs Modify - Add session module
src/server/handler.rs Modify - Integrate session tracking
src/server/config/types.rs Modify - Add session config

Testing Requirements

  1. Unit test: Session creation
  2. Unit test: Per-user session limit
  3. Unit test: Total session limit
  4. Unit test: Idle timeout detection
  5. Unit test: Activity tracking
  6. Unit test: Session removal

Acceptance Criteria

  • SessionManager implementation
  • Per-user session limits
  • Total session limits
  • Idle timeout detection
  • Optional max session duration
  • Activity tracking (touch)
  • Background idle checker
  • Session statistics
  • Admin session list/kill
  • Tests passing

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions