Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions apps/websocket-server/internal/domain/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/gofiber/websocket/v2"
Expand Down Expand Up @@ -77,7 +78,7 @@ type LogsReqData struct {
Since *string `json:"since,omitempty"`
}

func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, consumetId string, since *string) (*msg_nats.JetstreamConsumer, error) {
func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string, consumerId string, since *string) (*msg_nats.JetstreamConsumer, error) {
t, err := parseSince(since)
if err != nil {
return nil, errors.NewE(err)
Expand All @@ -89,7 +90,7 @@ func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string,
ConsumerConfig: msg_nats.ConsumerConfig{
DeliverPolicy: jetstream.DeliverByStartTimePolicy,
OptStartTime: t,
Name: consumetId,
Name: consumerId,
Description: "this is an ephemeral consumer which dispatches logs to a websocket client",
FilterSubjects: []string{
subject,
Expand All @@ -101,7 +102,7 @@ func (d *domain) newJetstreamConsumerForLog(ctx context.Context, subject string,
return msg_nats.NewJetstreamConsumer(ctx, d.jetStreamClient, msg_nats.JetstreamConsumerArgs{
Stream: d.env.LogsStreamName,
ConsumerConfig: msg_nats.ConsumerConfig{
Name: consumetId,
Name: consumerId,
Description: "this is an ephemeral consumer which dispatches logs to a websocket client",
FilterSubjects: []string{
subject,
Expand All @@ -116,7 +117,7 @@ func getLogHash(ld LogsReqData, userId repos.ID) string {
}

func (d *domain) getLogSubsId(ld LogsReqData) string {
return fmt.Sprintf("%s.%s.%s.%s", d.env.LogsStreamName, ld.AccountName, ld.ClusterName, ld.TrackingId)
return fmt.Sprintf("%s.%s.%s.%s.>", d.env.LogsStreamName, ld.AccountName, ld.ClusterName, ld.TrackingId)
}

func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn) error {
Expand Down Expand Up @@ -158,6 +159,7 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn)
type MessageResponse struct {
Timestamp time.Time `json:"timestamp"`
Message string `json:"message"`
Container *string `json:"container,omitempty"`
Type MessageType `json:"type"`
}

Expand Down Expand Up @@ -239,15 +241,15 @@ func (d *domain) HandleWebSocketForLogs(ctx context.Context, c *websocket.Conn)
if err := jc.Consume(
func(msg *types.ConsumeMsg) error {
if c != nil {

var resp MessageResponse
if err := json.Unmarshal(msg.Payload, &resp); err != nil {
if err := writeError(c, err); err != nil {
log.Warnf("websocket write: %w", err)
}
}

resp.Type = MessageTypeLog
sp := strings.Split(msg.Subject, ".")
resp.Container = &sp[len(sp)-1]
if err := c.WriteJSON(resp); err != nil {
log.Warnf("websocket write: %w", err)
}
Expand Down