From 68fec8c6948762c3db3086f2289628bd0579bac0 Mon Sep 17 00:00:00 2001 From: Abdhesh Nayak Date: Mon, 5 Feb 2024 16:41:00 +0530 Subject: [PATCH] :bug: Fixed issue with logs, where container_name needed to parse --- apps/websocket-server/internal/domain/logs.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/websocket-server/internal/domain/logs.go b/apps/websocket-server/internal/domain/logs.go index 513b7b947..13e49a10c 100644 --- a/apps/websocket-server/internal/domain/logs.go +++ b/apps/websocket-server/internal/domain/logs.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "time" "github.com/gofiber/websocket/v2" @@ -77,7 +78,7 @@ type LogsReqData struct { Since *string `json:"since,omitempty"` } -func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, consumetId string, since *string) (*msg_nats.JetstreamConsumer, error) { +func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, consumerId string, since *string) (*msg_nats.JetstreamConsumer, error) { t, err := parseSince(since) if err != nil { return nil, errors.NewE(err) @@ -89,7 +90,7 @@ func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, ConsumerConfig: msg_nats.ConsumerConfig{ DeliverPolicy: jetstream.DeliverByStartTimePolicy, OptStartTime: t, - Name: consumetId, + Name: consumerId, Description: "this is an ephemeral consumer which dispatches logs to a websocket client", FilterSubjects: []string{ subject, @@ -101,7 +102,7 @@ func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, return msg_nats.NewJetstreamConsumer(ctx, d.jetStreamClient, msg_nats.JetstreamConsumerArgs{ Stream: d.env.LogsStreamName, ConsumerConfig: msg_nats.ConsumerConfig{ - Name: consumetId, + Name: consumerId, Description: "this is an ephemeral consumer which dispatches logs to a websocket client", FilterSubjects: []string{ subject, @@ -116,7 +117,7 @@ func getLogHash(ld LogsReqData, userId repos.ID) string { } func (d *domain) getLogSubsId(ld LogsReqData) string { - return fmt.Sprintf("%s.%s.%s.%s", d.env.LogsStreamName, ld.AccountName, ld.ClusterName, ld.TrackingId) + return fmt.Sprintf("%s.%s.%s.%s.>", d.env.LogsStreamName, ld.AccountName, ld.ClusterName, ld.TrackingId) } func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) error { @@ -158,6 +159,7 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) type MessageResponse struct { Timestamp time.Time `json:"timestamp"` Message string `json:"message"` + Container *string `json:"container,omitempty"` Type MessageType `json:"type"` } @@ -239,15 +241,15 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) if err := jc.Consume( func(msg *types.ConsumeMsg) error { if c != nil { - var resp MessageResponse if err := json.Unmarshal(msg.Payload, &resp); err != nil { if err := writeError(c, err); err != nil { log.Warnf("websocket write: %w", err) } } - resp.Type = MessageTypeLog + sp := strings.Split(msg.Subject, ".") + resp.Container = &sp[len(sp)-1] if err := c.WriteJSON(resp); err != nil { log.Warnf("websocket write: %w", err) }