From ed10c3bc3c979d4f1a1c3a64c9d7c07af32be678 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Wed, 27 Aug 2025 15:09:09 +0330 Subject: [PATCH 1/3] Trojan uot: fix memory/goroutine leak --- proxy/trojan/server.go | 10 ++++++++-- transport/internet/udp/dispatcher.go | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/proxy/trojan/server.go b/proxy/trojan/server.go index 0ce14408d5b1..1e8ec3a5b8fd 100644 --- a/proxy/trojan/server.go +++ b/proxy/trojan/server.go @@ -233,7 +233,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con sessionPolicy = s.policyManager.ForLevel(user.Level) if destination.Network == net.Network_UDP { // handle udp request - return s.handleUDPPayload(ctx, &PacketReader{Reader: clientReader}, &PacketWriter{Writer: conn}, dispatcher) + return s.handleUDPPayload(ctx, &PacketReader{Reader: clientReader}, &PacketWriter{Writer: conn}, dispatcher, conn) } ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ @@ -248,7 +248,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con return s.handleConnection(ctx, sessionPolicy, destination, clientReader, buf.NewWriter(conn), dispatcher) } -func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher) error { +func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher, conn stat.Connection) error { udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) { udpPayload := packet.Payload if udpPayload.UDP == nil { @@ -259,6 +259,12 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade errors.LogWarningInner(ctx, err, "failed to write response") } }) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + udpServer.SetCallClose(func() error { + cancel() + return conn.Close() + }) defer udpServer.RemoveRay() inbound := session.InboundFromContext(ctx) diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 22db4244b495..99690cc01fa7 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -41,6 +41,10 @@ func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Di } } +func (v *Dispatcher) SetCallClose(f func() error) { + v.callClose = f +} + func (v *Dispatcher) RemoveRay() { v.Lock() defer v.Unlock() From 27be50a79c03f00c231211cb52dc9a54573880d7 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Wed, 27 Aug 2025 17:08:38 +0330 Subject: [PATCH 2/3] add `connIdle` --- proxy/trojan/server.go | 93 +++++++++++++++------------- transport/internet/udp/dispatcher.go | 4 -- 2 files changed, 51 insertions(+), 46 deletions(-) diff --git a/proxy/trojan/server.go b/proxy/trojan/server.go index 1e8ec3a5b8fd..ba2d3cbd5bf6 100644 --- a/proxy/trojan/server.go +++ b/proxy/trojan/server.go @@ -233,7 +233,7 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con sessionPolicy = s.policyManager.ForLevel(user.Level) if destination.Network == net.Network_UDP { // handle udp request - return s.handleUDPPayload(ctx, &PacketReader{Reader: clientReader}, &PacketWriter{Writer: conn}, dispatcher, conn) + return s.handleUDPPayload(ctx, sessionPolicy, &PacketReader{Reader: clientReader}, &PacketWriter{Writer: conn}, dispatcher) } ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ @@ -248,7 +248,11 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Con return s.handleConnection(ctx, sessionPolicy, destination, clientReader, buf.NewWriter(conn), dispatcher) } -func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher, conn stat.Connection) error { +func (s *Server) handleUDPPayload(ctx context.Context, sessionPolicy policy.Session, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle) + defer timer.SetTimeout(0) udpServer := udp.NewDispatcher(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) { udpPayload := packet.Payload if udpPayload.UDP == nil { @@ -257,14 +261,11 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade if err := clientWriter.WriteMultiBuffer(buf.MultiBuffer{udpPayload}); err != nil { errors.LogWarningInner(ctx, err, "failed to write response") + cancel() + } else { + timer.Update() } }) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - udpServer.SetCallClose(func() error { - cancel() - return conn.Close() - }) defer udpServer.RemoveRay() inbound := session.InboundFromContext(ctx) @@ -272,47 +273,55 @@ func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReade var dest *net.Destination - for { - select { - case <-ctx.Done(): - return nil - default: - mb, err := clientReader.ReadMultiBuffer() - if err != nil { - if errors.Cause(err) != io.EOF { - return errors.New("unexpected EOF").Base(err) - } + requestDone := func() error { + for { + select { + case <-ctx.Done(): return nil - } + default: + mb, err := clientReader.ReadMultiBuffer() + if err != nil { + if errors.Cause(err) != io.EOF { + return errors.New("unexpected EOF").Base(err) + } + return nil + } - mb2, b := buf.SplitFirst(mb) - if b == nil { - continue - } - destination := *b.UDP - - currentPacketCtx := ctx - if inbound.Source.IsValid() { - currentPacketCtx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ - From: inbound.Source, - To: destination, - Status: log.AccessAccepted, - Reason: "", - Email: user.Email, - }) - } - errors.LogInfo(ctx, "tunnelling request to ", destination) + mb2, b := buf.SplitFirst(mb) + if b == nil { + continue + } + destination := *b.UDP + + currentPacketCtx := ctx + if inbound.Source.IsValid() { + currentPacketCtx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{ + From: inbound.Source, + To: destination, + Status: log.AccessAccepted, + Reason: "", + Email: user.Email, + }) + } + errors.LogInfo(ctx, "tunnelling request to ", destination) - if !s.cone || dest == nil { - dest = &destination - } + if !s.cone || dest == nil { + dest = &destination + } - udpServer.Dispatch(currentPacketCtx, *dest, b) // first packet - for _, payload := range mb2 { - udpServer.Dispatch(currentPacketCtx, *dest, payload) + udpServer.Dispatch(currentPacketCtx, *dest, b) // first packet + for _, payload := range mb2 { + udpServer.Dispatch(currentPacketCtx, *dest, payload) + } } } + } + + if err := task.Run(ctx, requestDone); err != nil { + return err + } + return nil } func (s *Server) handleConnection(ctx context.Context, sessionPolicy policy.Session, diff --git a/transport/internet/udp/dispatcher.go b/transport/internet/udp/dispatcher.go index 99690cc01fa7..22db4244b495 100644 --- a/transport/internet/udp/dispatcher.go +++ b/transport/internet/udp/dispatcher.go @@ -41,10 +41,6 @@ func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Di } } -func (v *Dispatcher) SetCallClose(f func() error) { - v.callClose = f -} - func (v *Dispatcher) RemoveRay() { v.Lock() defer v.Unlock() From 43493b6469bd81d6c7248a9e5e87a73f1e6744e5 Mon Sep 17 00:00:00 2001 From: patterniha <71074308+patterniha@users.noreply.github.com> Date: Wed, 27 Aug 2025 17:20:42 +0330 Subject: [PATCH 3/3] add forgotten timer.update --- proxy/trojan/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy/trojan/server.go b/proxy/trojan/server.go index ba2d3cbd5bf6..8ed3b0e6eecc 100644 --- a/proxy/trojan/server.go +++ b/proxy/trojan/server.go @@ -291,6 +291,7 @@ func (s *Server) handleUDPPayload(ctx context.Context, sessionPolicy policy.Sess if b == nil { continue } + timer.Update() destination := *b.UDP currentPacketCtx := ctx