From 3f1825ffa8c953ee2464bcb7bbfb9116f0597623 Mon Sep 17 00:00:00 2001 From: Abdhesh Nayak Date: Wed, 7 Feb 2024 18:57:22 +0530 Subject: [PATCH 1/2] :bug: Fixed issue with memory leak on socket-server --- apps/websocket-server/internal/domain/logs.go | 82 ++++++++++--------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/apps/websocket-server/internal/domain/logs.go b/apps/websocket-server/internal/domain/logs.go index 232a0dd69..8f171691b 100644 --- a/apps/websocket-server/internal/domain/logs.go +++ b/apps/websocket-server/internal/domain/logs.go @@ -242,54 +242,56 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) continue } - if err := jc.Consume( - func(msg *types.ConsumeMsg) error { - if c != nil { - var resp MessageResponse - if err := json.Unmarshal(msg.Payload, &resp); err != nil { - if err := writeError(c, err); err != nil { + resources[hash] = &Subscription{ + resource: msg.Data, + jc: jc, + open: true, + } + + go func() { + + if err := writeInfo(c, "subscribed to logs"); err != nil { + log.Warnf("websocket write: %w", err) + } + + if err := jc.Consume( + func(msg *types.ConsumeMsg) error { + if c != nil { + var resp MessageResponse + if err := json.Unmarshal(msg.Payload, &resp); err != nil { + if err := writeError(c, err); err != nil { + log.Warnf("websocket write: %w", err) + } + } + resp.Type = MessageTypeLog + sp := strings.Split(msg.Subject, ".") + resp.Spec = &MsgSpec{ + PodName: sp[len(sp)-2], + ContainerName: sp[len(sp)-1], + } + if err := c.WriteJSON(resp); err != nil { log.Warnf("websocket write: %w", err) } } - resp.Type = MessageTypeLog - sp := strings.Split(msg.Subject, ".") - resp.Spec = &MsgSpec{ - PodName: sp[len(sp)-2], - ContainerName: sp[len(sp)-1], - } - if err := c.WriteJSON(resp); err != nil { - log.Warnf("websocket write: %w", err) - } - } - return nil - }, - types.ConsumeOpts{ - OnError: func(err error) error { - if err := writeError(c, err); err != nil { - log.Warnf("websocket write: %w", err) - } + return nil + }, + types.ConsumeOpts{ + OnError: func(err error) error { + if err := writeError(c, err); err != nil { + log.Warnf("websocket write: %w", err) + } - return err + return err + }, }, - }, - ); err != nil { - if err := writeError(c, err); err != nil { - log.Warnf("websocket write: %w", err) + ); err != nil { + if err := writeError(c, err); err != nil { + log.Warnf("websocket write: %w", err) + } } - continue - } - - if err := writeInfo(c, "subscribed to logs"); err != nil { - log.Warnf("websocket write: %w", err) - } - - resources[hash] = &Subscription{ - resource: msg.Data, - jc: jc, - open: true, - } + }() case "unsubscribe": if _, ok := resources[hash]; !ok { From 46d5e9789bfa21b60b51b60fb13c16db22b3ff01 Mon Sep 17 00:00:00 2001 From: Abdhesh Nayak Date: Thu, 8 Feb 2024 17:46:50 +0530 Subject: [PATCH 2/2] :bug: Fixed issue with memory leak with websocket --- apps/websocket-server/internal/domain/logs.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/apps/websocket-server/internal/domain/logs.go b/apps/websocket-server/internal/domain/logs.go index 8f171691b..39e5e2abd 100644 --- a/apps/websocket-server/internal/domain/logs.go +++ b/apps/websocket-server/internal/domain/logs.go @@ -142,6 +142,16 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) resources := make(map[string]*Subscription) + defer func() { + for _, v := range resources { + if v.jc != nil { + if err := v.jc.Stop(ctx); err != nil { + log.Warnf("stop jetstream consumer: %w", err) + } + } + } + }() + type Message struct { Event string `json:"event"` Data LogsReqData `json:"data"`