diff --git a/apps/websocket-server/internal/domain/logs.go b/apps/websocket-server/internal/domain/logs.go index 8d0caea52..3c5fe0fbc 100644 --- a/apps/websocket-server/internal/domain/logs.go +++ b/apps/websocket-server/internal/domain/logs.go @@ -111,30 +111,25 @@ func (d *domain) handleLogsMsg(ctx types.Context, logsSubs *logs.LogsSubsMap, ms if err := jc.Consume( func(m *msg_types.ConsumeMsg) error { - if ctx.Connection != nil { - var data logs.Response - var resp types.Response[logs.Response] - if err := json.Unmarshal(m.Payload, &data); err != nil { - return err - } - - resp.Type = types.MessageTypeResponse - resp.Id = msg.Id - sp := strings.Split(m.Subject, ".") - - data.PodName = sp[len(sp)-2] - data.ContainerName = sp[len(sp)-1] - - resp.Data = data - resp.For = types.ForLogs - - ctx.Mutex.Lock() - if ctx.Connection != nil { - if err := ctx.Connection.WriteJSON(resp); err != nil { - log.Warnf("websocket write: %w", err) - } - } - ctx.Mutex.Unlock() + + var data logs.Response + var resp types.Response[logs.Response] + if err := json.Unmarshal(m.Payload, &data); err != nil { + return err + } + + resp.Type = types.MessageTypeResponse + resp.Id = msg.Id + sp := strings.Split(m.Subject, ".") + + data.PodName = sp[len(sp)-2] + data.ContainerName = sp[len(sp)-1] + + resp.Data = data + resp.For = types.ForLogs + + if err := ctx.WriteJSON(resp); err != nil { + log.Warnf("websocket write: %w", err) } return nil diff --git a/apps/websocket-server/internal/domain/main.go b/apps/websocket-server/internal/domain/main.go index e5ad3b7a4..2ead4a344 100644 --- a/apps/websocket-server/internal/domain/main.go +++ b/apps/websocket-server/internal/domain/main.go @@ -27,6 +27,19 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { logsSubs := &logs.LogsSubsMap{} rWatchSubs := &res_watch.ResWatchSubsMap{} + write := func(msg interface{}) error { + if c != nil { + mu.Lock() + if err := c.WriteJSON(msg); err != nil { + d.logger.Warnf("websocket write: %w", err) + } + mu.Unlock() + return nil + } + + return fmt.Errorf("connection is closed") + } + defer func() { if err := c.Close(); err != nil { d.logger.Warnf("websocket close: %w", err) @@ -63,10 +76,10 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { }) sc := types.Context{ - Context: ctx, - Session: sess, - Connection: c, - Mutex: &mu, + Context: ctx, + Session: sess, + // Connection: c, + Mutex: &mu, } for { @@ -90,10 +103,12 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { switch msg.For { case types.ForLogs: if err := d.handleLogsMsg(types.Context{ - Context: ctx, - Session: sess, - Connection: c, - Mutex: &mu, + Context: ctx, + Session: sess, + // Connection: c, + Mutex: &mu, + Logger: d.logger, + WriteJSON: write, }, logsSubs, msg.Data); err != nil { utils.WriteError(sc, err, "", types.ForLogs) } @@ -101,8 +116,11 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { case types.ForResourceUpdate: if err := d.handleResWatchMsg(types.Context{ Context: ctx, - Session: sess, - Mutex: &mu, + // Connection: c, + Session: sess, + Mutex: &mu, + Logger: d.logger, + WriteJSON: write, }, rWatchSubs, msg.Data); err != nil { utils.WriteError(sc, err, "", types.ForResourceUpdate) } diff --git a/apps/websocket-server/internal/domain/resource-update.go b/apps/websocket-server/internal/domain/resource-update.go index 77d10b635..7b567c7fd 100644 --- a/apps/websocket-server/internal/domain/resource-update.go +++ b/apps/websocket-server/internal/domain/resource-update.go @@ -50,6 +50,7 @@ func (d *domain) checkAccess(ctx context.Context, rdata *res_watch.ReqData, user } func (d *domain) handleResWatchMsg(ctx types.Context, resources *res_watch.ResWatchSubsMap, msgAny map[string]any) error { + var msg res_watch.Message b, err := json.Marshal(msgAny) if err != nil { @@ -81,7 +82,18 @@ func (d *domain) handleResWatchMsg(ctx types.Context, resources *res_watch.ResWa return fmt.Errorf("resource already subscribed") } - s, err := d.natsClient.Conn.Subscribe(rd.Topic, func(m *mnats.Msg) {}) + s, err := d.natsClient.Conn.Subscribe(rd.Topic, func(m *mnats.Msg) { + if err := ctx.WriteJSON(types.Response[res_watch.Response]{ + Type: types.MessageTypeResponse, + For: types.ForResourceUpdate, + Data: res_watch.Response{}, + Message: "update", + Id: msg.Id, + }); err != nil { + utils.WriteError(ctx, err, msg.Id, types.ForResourceUpdate) + } + }) + if err != nil { return err } diff --git a/apps/websocket-server/internal/domain/resource_watch/main.go b/apps/websocket-server/internal/domain/resource_watch/main.go index a8ebb202f..4bbe14667 100644 --- a/apps/websocket-server/internal/domain/resource_watch/main.go +++ b/apps/websocket-server/internal/domain/resource_watch/main.go @@ -35,6 +35,9 @@ type ReqData struct { ReqTopic string `json:"req_topic"` } +type Response struct{ +} + func ParseReq(rt string) (*ReqData, error) { entriesStrs := strings.Split(rt, ".") diff --git a/apps/websocket-server/internal/domain/types/main.go b/apps/websocket-server/internal/domain/types/main.go index ee17080d2..c569e0c76 100644 --- a/apps/websocket-server/internal/domain/types/main.go +++ b/apps/websocket-server/internal/domain/types/main.go @@ -4,8 +4,8 @@ import ( "context" "sync" - "github.com/gofiber/websocket/v2" "github.com/kloudlite/api/common" + "github.com/kloudlite/api/pkg/logging" ) type For string @@ -38,8 +38,10 @@ type Message struct { } type Context struct { - Context context.Context - Session *common.AuthSession - Connection *websocket.Conn - Mutex *sync.Mutex + Logger logging.Logger + Context context.Context + Session *common.AuthSession + // Connection *websocket.Conn + Mutex *sync.Mutex + WriteJSON func(interface{}) error } diff --git a/apps/websocket-server/internal/domain/utils/main.go b/apps/websocket-server/internal/domain/utils/main.go index b324174b1..286b73c0d 100644 --- a/apps/websocket-server/internal/domain/utils/main.go +++ b/apps/websocket-server/internal/domain/utils/main.go @@ -1,38 +1,27 @@ package utils import ( - "github.com/gofiber/fiber/v2/log" "github.com/kloudlite/api/apps/websocket-server/internal/domain/types" ) func WriteError(ctx types.Context, err error, id string, For types.For) { - if ctx.Context != nil { - ctx.Mutex.Lock() - if err := ctx.Connection.WriteJSON(types.Response[any]{ - Type: types.MessageTypeError, - Message: err.Error(), - For: For, - Id: id, - }); err != nil { - log.Warnf("websocket write: %w", err) - } - ctx.Mutex.Unlock() + if err := ctx.WriteJSON(types.Response[any]{ + Type: types.MessageTypeError, + Message: err.Error(), + For: For, + Id: id, + }); err != nil { + ctx.Logger.Warnf("websocket write: %w", err) } } func WriteInfo(ctx types.Context, msg string, id string, For types.For) { - if ctx.Context != nil { - ctx.Mutex.Lock() - if err := ctx.Connection.WriteJSON(types.Response[any]{ - Type: types.MessageTypeInfo, - Message: msg, - Id: id, - For: For, - }); err != nil { - log.Warnf("websocket write: %w", err) - } - ctx.Mutex.Unlock() - } else { - log.Warnf("websocket connection is nil") + if err := ctx.WriteJSON(types.Response[any]{ + Type: types.MessageTypeInfo, + Message: msg, + Id: id, + For: For, + }); err != nil { + ctx.Logger.Warnf("websocket write: %w", err) } }