From 69b7a7806a7311e8436f4ee2222b16558a2fc7a3 Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 08:29:47 +0800 Subject: [PATCH 1/6] feat: Enhance WebSocket client functionality and improve data processing - Reduced message queue size in WebSocket client from 100 to 32. - Introduced atomic boolean to track client closure state. - Added SendPayload method to handle message sending with queue management. - Updated ProcessData function to utilize SendPayload for sending responses. - Expanded netTypes to include both IPv4 and IPv6 protocols in network connection retrieval. - Improved net connection processing by using a map for process names, enhancing efficiency. --- agent/utils/websocket/client.go | 26 ++++++++++++- agent/utils/websocket/process_data.go | 56 +++++++++++++++------------ 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/agent/utils/websocket/client.go b/agent/utils/websocket/client.go index 368eeeee9620..df1b8f6df50c 100644 --- a/agent/utils/websocket/client.go +++ b/agent/utils/websocket/client.go @@ -1,26 +1,32 @@ package websocket import ( + "sync/atomic" + "github.com/gorilla/websocket" ) +const MaxMessageQuenue = 32 + type Client struct { ID string Socket *websocket.Conn Msg chan []byte + closed atomic.Bool } func NewWsClient(ID string, socket *websocket.Conn) *Client { return &Client{ ID: ID, Socket: socket, - Msg: make(chan []byte, 100), + Msg: make(chan []byte, 32), + closed: atomic.Bool{}, } } func (c *Client) Read() { defer func() { - close(c.Msg) + c.Close() }() for { _, message, err := c.Socket.ReadMessage() @@ -43,3 +49,19 @@ func (c *Client) Write() { _ = c.Socket.WriteMessage(websocket.TextMessage, message) } } + +func (c *Client) SendPayload(res []byte) { + if c.closed.Load() { + return + } + select { + case c.Msg <- res: + return + default: + select { + case <-c.Msg: + default: + } + c.Msg <- res + } +} diff --git a/agent/utils/websocket/process_data.go b/agent/utils/websocket/process_data.go index 7ac0b177ab0d..7d09bed80067 100644 --- a/agent/utils/websocket/process_data.go +++ b/agent/utils/websocket/process_data.go @@ -7,8 +7,8 @@ import ( "strings" "time" - "github.com/1Panel-dev/1Panel/agent/utils/common" "github.com/1Panel-dev/1Panel/agent/global" + "github.com/1Panel-dev/1Panel/agent/utils/common" "github.com/1Panel-dev/1Panel/agent/utils/files" "github.com/shirou/gopsutil/v4/host" "github.com/shirou/gopsutil/v4/net" @@ -113,25 +113,25 @@ func ProcessData(c *Client, inputMsg []byte) { if err != nil { return } - c.Msg <- res + c.SendPayload(res) case "ps": res, err := getProcessData(wsInput.PsProcessConfig) if err != nil { return } - c.Msg <- res + c.SendPayload(res) case "ssh": res, err := getSSHSessions(wsInput.SSHSessionConfig) if err != nil { return } - c.Msg <- res + c.SendPayload(res) case "net": res, err := getNetConnections(wsInput.NetConfig) if err != nil { return } - c.Msg <- res + c.SendPayload(res) } } @@ -312,29 +312,36 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) { return } -var netTypes = [...]string{"tcp", "udp"} +var netTypes = [...]string{"tcp", "udp", "tcp6", "udp6"} func getNetConnections(config NetConfig) (res []byte, err error) { - var ( - result []ProcessConnect - proc *process.Process - ) + ctx := context.Background() + processes, err := process.ProcessesWithContext(ctx) + if err != nil { + return + } + + procPidMap := make(map[int32]string, len(processes)) + for _, proc := range processes { + name, _ := proc.Name() + if name == "" { + continue + } + if config.ProcessName != "" && !strings.Contains(name, config.ProcessName) { + continue + } + if config.ProcessID > 0 && config.ProcessID != proc.Pid { + continue + } + procPidMap[proc.Pid] = name + } + + result := make([]ProcessConnect, 0, len(processes)) for _, netType := range netTypes { - connections, _ := net.Connections(netType) + connections, err := net.ConnectionsWithContext(ctx, netType) if err == nil { for _, conn := range connections { - if config.ProcessID > 0 && config.ProcessID != conn.Pid { - continue - } - proc, err = process.NewProcess(conn.Pid) - if err == nil { - name, _ := proc.Name() - if name != "" && config.ProcessName != "" && !strings.Contains(name, config.ProcessName) { - continue - } - if config.Port > 0 && config.Port != conn.Laddr.Port && config.Port != conn.Raddr.Port { - continue - } + if name, ok := procPidMap[conn.Pid]; ok { result = append(result, ProcessConnect{ Type: netType, Status: conn.Status, @@ -344,10 +351,9 @@ func getNetConnections(config NetConfig) (res []byte, err error) { Name: name, }) } - } } } - res, err = json.Marshal(result) + res, _ = json.Marshal(result) return } From d0b1bd8cd50dfdfd3d4f7642b491893e69c68528 Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 08:56:56 +0800 Subject: [PATCH 2/6] feat: Enhance WebSocket client and process data handling - Added synchronization with sync.Once for safe closure of WebSocket client. - Updated message queue size to a constant for better maintainability. - Implemented context timeouts for process data retrieval to prevent blocking. - Improved network connection handling by utilizing a more efficient method for retrieving connections. - Introduced a new function to determine connection types based on protocol family. --- agent/utils/websocket/client.go | 34 ++++++------- agent/utils/websocket/process_data.go | 73 +++++++++++++++++++-------- 2 files changed, 68 insertions(+), 39 deletions(-) diff --git a/agent/utils/websocket/client.go b/agent/utils/websocket/client.go index df1b8f6df50c..73fb261815be 100644 --- a/agent/utils/websocket/client.go +++ b/agent/utils/websocket/client.go @@ -1,6 +1,7 @@ package websocket import ( + "sync" "sync/atomic" "github.com/gorilla/websocket" @@ -9,25 +10,31 @@ import ( const MaxMessageQuenue = 32 type Client struct { - ID string - Socket *websocket.Conn - Msg chan []byte - closed atomic.Bool + ID string + Socket *websocket.Conn + Msg chan []byte + closed atomic.Bool + closeOnce sync.Once } func NewWsClient(ID string, socket *websocket.Conn) *Client { return &Client{ ID: ID, Socket: socket, - Msg: make(chan []byte, 32), - closed: atomic.Bool{}, + Msg: make(chan []byte, MaxMessageQuenue), } } +func (c *Client) Close() { + c.closeOnce.Do(func() { + c.closed.Store(true) + close(c.Msg) + c.Socket.Close() + }) +} + func (c *Client) Read() { - defer func() { - c.Close() - }() + defer c.Close() for { _, message, err := c.Socket.ReadMessage() if err != nil { @@ -38,9 +45,6 @@ func (c *Client) Read() { } func (c *Client) Write() { - defer func() { - c.Socket.Close() - }() for { message, ok := <-c.Msg if !ok { @@ -56,12 +60,6 @@ func (c *Client) SendPayload(res []byte) { } select { case c.Msg <- res: - return default: - select { - case <-c.Msg: - default: - } - c.Msg <- res } } diff --git a/agent/utils/websocket/process_data.go b/agent/utils/websocket/process_data.go index 7d09bed80067..39218775e25f 100644 --- a/agent/utils/websocket/process_data.go +++ b/agent/utils/websocket/process_data.go @@ -15,6 +15,8 @@ import ( "github.com/shirou/gopsutil/v4/process" ) +const defaultTimeout = 10 * time.Second + type WsInput struct { Type string `json:"type"` DownloadProgress @@ -204,7 +206,8 @@ func handleProcessData(proc *process.Process, processConfig *PsProcessConfig, pi } func getProcessData(processConfig PsProcessConfig) (res []byte, err error) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() processes, err := process.ProcessesWithContext(ctx) if err != nil { @@ -243,7 +246,10 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) { users []host.UserStat processes []*process.Process ) - users, err = host.Users() + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + + users, err = host.UsersWithContext(ctx) if err != nil { res, err = json.Marshal(result) return @@ -268,8 +274,9 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) { return } - processes, err = process.Processes() + processes, err = process.ProcessesWithContext(ctx) if err != nil { + res, err = json.Marshal(result) return } @@ -312,12 +319,14 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) { return } -var netTypes = [...]string{"tcp", "udp", "tcp6", "udp6"} - func getNetConnections(config NetConfig) (res []byte, err error) { - ctx := context.Background() + result := make([]ProcessConnect, 0) + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + processes, err := process.ProcessesWithContext(ctx) if err != nil { + res, _ = json.Marshal(result) return } @@ -336,24 +345,46 @@ func getNetConnections(config NetConfig) (res []byte, err error) { procPidMap[proc.Pid] = name } - result := make([]ProcessConnect, 0, len(processes)) - for _, netType := range netTypes { - connections, err := net.ConnectionsWithContext(ctx, netType) - if err == nil { - for _, conn := range connections { - if name, ok := procPidMap[conn.Pid]; ok { - result = append(result, ProcessConnect{ - Type: netType, - Status: conn.Status, - Laddr: conn.Laddr, - Raddr: conn.Raddr, - PID: conn.Pid, - Name: name, - }) - } + connections, err := net.ConnectionsMaxWithContext(ctx, "all", 32768) + if err != nil { + res, _ = json.Marshal(result) + return + } + + for _, conn := range connections { + if conn.Family != 2 && conn.Family != 10 { + continue + } + if name, ok := procPidMap[conn.Pid]; ok { + connType := getConnectionType(conn.Type, conn.Family) + if config.Port > 0 && conn.Laddr.Port != config.Port { + continue } + result = append(result, ProcessConnect{ + Type: connType, + Status: conn.Status, + Laddr: conn.Laddr, + Raddr: conn.Raddr, + PID: conn.Pid, + Name: name, + }) } } res, _ = json.Marshal(result) return } + +func getConnectionType(connType uint32, family uint32) string { + switch { + case connType == 1 && family == 2: + return "tcp" + case connType == 1 && family == 10: + return "tcp6" + case connType == 2 && family == 2: + return "udp" + case connType == 2 && family == 10: + return "udp6" + default: + return "unknown" + } +} From 8d28f5b8873f9c9dfa111b690e7068c8c3e181cb Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:19:36 +0800 Subject: [PATCH 3/6] feat: Enhance network connection retrieval and process name mapping - Updated getNetConnections function to improve efficiency by using maps for process names and connections. - Introduced a new helper function to retrieve process names from the filesystem or process context. - Enhanced filtering logic for network connections based on process ID, name, and port. - Increased initial capacity for connection results to optimize performance. --- agent/utils/websocket/process_data.go | 69 +++++++++++++++++---------- 1 file changed, 45 insertions(+), 24 deletions(-) diff --git a/agent/utils/websocket/process_data.go b/agent/utils/websocket/process_data.go index 39218775e25f..6a8a66dfaa54 100644 --- a/agent/utils/websocket/process_data.go +++ b/agent/utils/websocket/process_data.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "os" "strings" "time" @@ -320,60 +321,80 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) { } func getNetConnections(config NetConfig) (res []byte, err error) { - result := make([]ProcessConnect, 0) + result := make([]ProcessConnect, 0, 1024) ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() - processes, err := process.ProcessesWithContext(ctx) + connections, err := net.ConnectionsMaxWithContext(ctx, "all", 32768) if err != nil { res, _ = json.Marshal(result) return } - procPidMap := make(map[int32]string, len(processes)) - for _, proc := range processes { - name, _ := proc.Name() - if name == "" { + pidConnectionsMap := make(map[int32][]net.ConnectionStat, 256) + pidNameMap := make(map[int32]string, 256) + + for _, conn := range connections { + if conn.Family != 2 && conn.Family != 10 { continue } - if config.ProcessName != "" && !strings.Contains(name, config.ProcessName) { + + if conn.Pid == 0 { continue } - if config.ProcessID > 0 && config.ProcessID != proc.Pid { + + if config.ProcessID > 0 && conn.Pid != config.ProcessID { continue } - procPidMap[proc.Pid] = name - } - connections, err := net.ConnectionsMaxWithContext(ctx, "all", 32768) - if err != nil { - res, _ = json.Marshal(result) - return - } - - for _, conn := range connections { - if conn.Family != 2 && conn.Family != 10 { + if config.Port > 0 && conn.Laddr.Port != config.Port && conn.Raddr.Port != config.Port { continue } - if name, ok := procPidMap[conn.Pid]; ok { - connType := getConnectionType(conn.Type, conn.Family) - if config.Port > 0 && conn.Laddr.Port != config.Port { + + if _, exists := pidNameMap[conn.Pid]; !exists { + pName, _ := getProcessNameWithContext(ctx, conn.Pid) + if pName == "" { continue } + if config.ProcessName != "" && !strings.Contains(pName, config.ProcessName) { + continue + } + pidNameMap[conn.Pid] = pName + } + + pidConnectionsMap[conn.Pid] = append(pidConnectionsMap[conn.Pid], conn) + } + + for pid, connections := range pidConnectionsMap { + for _, conn := range connections { result = append(result, ProcessConnect{ - Type: connType, + Type: getConnectionType(conn.Type, conn.Family), Status: conn.Status, Laddr: conn.Laddr, Raddr: conn.Raddr, PID: conn.Pid, - Name: name, + Name: pidNameMap[pid], }) } } - res, _ = json.Marshal(result) + + res, err = json.Marshal(result) return } +func getProcessNameWithContext(ctx context.Context, pid int32) (string, error) { + data, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid)) + if err != nil || len(data) == 0 { + p, err := process.NewProcessWithContext(ctx, pid) + if err != nil { + return "", err + } + return p.Name() + } + return strings.TrimSpace(string(data)), nil + +} + func getConnectionType(connType uint32, family uint32) string { switch { case connType == 1 && family == 2: From 3d29c628abf753738d14e943b2d425e8f4216187 Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:21:51 +0800 Subject: [PATCH 4/6] refactor: Rename SendPayload method to Send in WebSocket client - Updated the SendPayload method to be more succinctly named Send for clarity. - Ensured the method continues to handle message sending while maintaining existing functionality. --- agent/utils/websocket/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/utils/websocket/client.go b/agent/utils/websocket/client.go index 73fb261815be..8dcb6497caa5 100644 --- a/agent/utils/websocket/client.go +++ b/agent/utils/websocket/client.go @@ -54,7 +54,7 @@ func (c *Client) Write() { } } -func (c *Client) SendPayload(res []byte) { +func (c *Client) Send(res []byte) { if c.closed.Load() { return } From 932ab0ee8365b57fab464a5426cf410ef0e5c2be Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:36:09 +0800 Subject: [PATCH 5/6] refactor: Update ProcessData and getNetConnections for improved clarity and efficiency - Replaced SendPayload method calls with Send for consistency in WebSocket message handling. - Enhanced getNetConnections function by refining process name retrieval and filtering logic. - Improved error handling in getProcessNameWithContext for better robustness. --- agent/utils/websocket/process_data.go | 34 +++++++++++++-------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/agent/utils/websocket/process_data.go b/agent/utils/websocket/process_data.go index 6a8a66dfaa54..eddf5518ecc8 100644 --- a/agent/utils/websocket/process_data.go +++ b/agent/utils/websocket/process_data.go @@ -116,25 +116,25 @@ func ProcessData(c *Client, inputMsg []byte) { if err != nil { return } - c.SendPayload(res) + c.Send(res) case "ps": res, err := getProcessData(wsInput.PsProcessConfig) if err != nil { return } - c.SendPayload(res) + c.Send(res) case "ssh": res, err := getSSHSessions(wsInput.SSHSessionConfig) if err != nil { return } - c.SendPayload(res) + c.Send(res) case "net": res, err := getNetConnections(wsInput.NetConfig) if err != nil { return } - c.SendPayload(res) + c.Send(res) } } @@ -354,10 +354,7 @@ func getNetConnections(config NetConfig) (res []byte, err error) { if _, exists := pidNameMap[conn.Pid]; !exists { pName, _ := getProcessNameWithContext(ctx, conn.Pid) if pName == "" { - continue - } - if config.ProcessName != "" && !strings.Contains(pName, config.ProcessName) { - continue + pName = "" } pidNameMap[conn.Pid] = pName } @@ -366,6 +363,10 @@ func getNetConnections(config NetConfig) (res []byte, err error) { } for pid, connections := range pidConnectionsMap { + pName := pidNameMap[pid] + if config.ProcessName != "" && !strings.Contains(pName, config.ProcessName) { + continue + } for _, conn := range connections { result = append(result, ProcessConnect{ Type: getConnectionType(conn.Type, conn.Family), @@ -373,7 +374,7 @@ func getNetConnections(config NetConfig) (res []byte, err error) { Laddr: conn.Laddr, Raddr: conn.Raddr, PID: conn.Pid, - Name: pidNameMap[pid], + Name: pName, }) } } @@ -384,15 +385,14 @@ func getNetConnections(config NetConfig) (res []byte, err error) { func getProcessNameWithContext(ctx context.Context, pid int32) (string, error) { data, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid)) - if err != nil || len(data) == 0 { - p, err := process.NewProcessWithContext(ctx, pid) - if err != nil { - return "", err - } - return p.Name() + if err == nil && len(data) > 0 { + return strings.TrimSpace(string(data)), nil } - return strings.TrimSpace(string(data)), nil - + p, err := process.NewProcessWithContext(ctx, pid) + if err != nil { + return "", err + } + return p.Name() } func getConnectionType(connType uint32, family uint32) string { From 33172e5d11973d197d8e4c23d21a8190c73c9fb7 Mon Sep 17 00:00:00 2001 From: HynoR <20227709+HynoR@users.noreply.github.com> Date: Tue, 9 Dec 2025 10:42:40 +0800 Subject: [PATCH 6/6] refactor: Simplify WebSocket client closure and reading logic - Removed unnecessary synchronization for closing the WebSocket client. - Updated the Read method to handle message reading directly without a separate Close method. - Ensured the Socket is closed properly after reading messages to prevent resource leaks. --- agent/utils/websocket/client.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/agent/utils/websocket/client.go b/agent/utils/websocket/client.go index 8dcb6497caa5..e9fdf9507a36 100644 --- a/agent/utils/websocket/client.go +++ b/agent/utils/websocket/client.go @@ -1,7 +1,6 @@ package websocket import ( - "sync" "sync/atomic" "github.com/gorilla/websocket" @@ -10,11 +9,10 @@ import ( const MaxMessageQuenue = 32 type Client struct { - ID string - Socket *websocket.Conn - Msg chan []byte - closed atomic.Bool - closeOnce sync.Once + ID string + Socket *websocket.Conn + Msg chan []byte + closed atomic.Bool } func NewWsClient(ID string, socket *websocket.Conn) *Client { @@ -25,16 +23,11 @@ func NewWsClient(ID string, socket *websocket.Conn) *Client { } } -func (c *Client) Close() { - c.closeOnce.Do(func() { +func (c *Client) Read() { + defer func() { c.closed.Store(true) close(c.Msg) - c.Socket.Close() - }) -} - -func (c *Client) Read() { - defer c.Close() + }() for { _, message, err := c.Socket.ReadMessage() if err != nil { @@ -45,6 +38,7 @@ func (c *Client) Read() { } func (c *Client) Write() { + defer c.Socket.Close() for { message, ok := <-c.Msg if !ok {