From f7fd2e4aa67117287c5d12b586306503cc483e72 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Wed, 14 Feb 2024 12:43:50 +0530 Subject: [PATCH 1/2] fix(apps/websocket-server): deletes consumer, when unsubscribing --- .tools/nvim/dap/go.lua | 13 +++++++++++++ apps/websocket-server/Taskfile.yml | 2 +- apps/websocket-server/internal/app/app.go | 2 -- apps/websocket-server/internal/domain/logs.go | 7 +++++-- apps/websocket-server/internal/domain/main.go | 12 +++++++----- pkg/messaging/nats/jetstream-consumer.go | 9 ++++++++- 6 files changed, 34 insertions(+), 11 deletions(-) diff --git a/.tools/nvim/dap/go.lua b/.tools/nvim/dap/go.lua index ed6087381..8d398304d 100644 --- a/.tools/nvim/dap/go.lua +++ b/.tools/nvim/dap/go.lua @@ -171,4 +171,17 @@ dap.configurations.go = { vim.g.nxt.project_root_dir .. "/apps/tenant-agent" .. "/.secrets/env", }, }, + + { + type = "go", + name = "Debug Websocket Server", + request = "launch", + program = vim.g.nxt.project_root_dir .. "/apps/websocket-server", + args = { "--dev"}, + console = "externalTerminal", + -- externalTerminal = true, + envFile = { + vim.g.nxt.project_root_dir .. "/apps/websocket-server" .. "/.secrets/env", + }, + }, } diff --git a/apps/websocket-server/Taskfile.yml b/apps/websocket-server/Taskfile.yml index f7566450c..5fe7a3b0b 100644 --- a/apps/websocket-server/Taskfile.yml +++ b/apps/websocket-server/Taskfile.yml @@ -4,7 +4,7 @@ dotenv: - .secrets/env vars: - ImagePrefix: "ghcr.io/kloudlite/platform/apis" + ImagePrefix: "ghcr.io/kloudlite/api" tasks: build: diff --git a/apps/websocket-server/internal/app/app.go b/apps/websocket-server/internal/app/app.go index e0986aabe..ddbb06212 100644 --- a/apps/websocket-server/internal/app/app.go +++ b/apps/websocket-server/internal/app/app.go @@ -38,7 +38,6 @@ var Module = fx.Module("app", logr logging.Logger, sessionRepo kv.Repo[*common.AuthSession], ) { - a := server.Raw() a.Use( @@ -76,7 +75,6 @@ var Module = fx.Module("app", a.All("*", func(c *fiber.Ctx) error { return c.SendStatus(fiber.StatusNotFound) }) - }, ), ) diff --git a/apps/websocket-server/internal/domain/logs.go b/apps/websocket-server/internal/domain/logs.go index 0cd0708a2..8d0caea52 100644 --- a/apps/websocket-server/internal/domain/logs.go +++ b/apps/websocket-server/internal/domain/logs.go @@ -13,6 +13,7 @@ import ( "github.com/kloudlite/api/apps/websocket-server/internal/domain/types" "github.com/kloudlite/api/apps/websocket-server/internal/domain/utils" "github.com/kloudlite/api/pkg/errors" + "github.com/kloudlite/api/pkg/messaging/nats" msg_nats "github.com/kloudlite/api/pkg/messaging/nats" msg_types "github.com/kloudlite/api/pkg/messaging/types" @@ -106,7 +107,6 @@ func (d *domain) handleLogsMsg(ctx types.Context, logsSubs *logs.LogsSubsMap, ms } go func() { - utils.WriteInfo(ctx, "subscribed to logs", msg.Id, types.ForLogs) if err := jc.Consume( @@ -148,7 +148,6 @@ func (d *domain) handleLogsMsg(ctx types.Context, logsSubs *logs.LogsSubsMap, ms ); err != nil { utils.WriteError(ctx, err, msg.Id, types.ForLogs) } - }() } @@ -163,6 +162,10 @@ func (d *domain) handleLogsMsg(ctx types.Context, logsSubs *logs.LogsSubsMap, ms return err } + if err := nats.DeleteConsumer(ctx.Context, d.jetStreamClient, res.Jc); err != nil { + return err + } + delete(*logsSubs, hash) } ctx.Mutex.Unlock() diff --git a/apps/websocket-server/internal/domain/main.go b/apps/websocket-server/internal/domain/main.go index e3ab586b5..e5ad3b7a4 100644 --- a/apps/websocket-server/internal/domain/main.go +++ b/apps/websocket-server/internal/domain/main.go @@ -13,6 +13,7 @@ import ( "github.com/kloudlite/api/common" "github.com/kloudlite/api/pkg/errors" httpServer "github.com/kloudlite/api/pkg/http-server" + "github.com/kloudlite/api/pkg/messaging/nats" ) func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { @@ -23,8 +24,8 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { mu := sync.Mutex{} - var logsSubs = &logs.LogsSubsMap{} - var rWatchSubs = &res_watch.ResWatchSubsMap{} + logsSubs := &logs.LogsSubsMap{} + rWatchSubs := &res_watch.ResWatchSubsMap{} defer func() { if err := c.Close(); err != nil { @@ -35,7 +36,10 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { for _, v := range *logsSubs { if v.Jc != nil { if err := v.Jc.Stop(ctx); err != nil { - d.logger.Warnf("stop jetstream consumer: %w", err) + d.logger.Warnf("stop jetstream consumer failed with err: %w", err) + } + if err := nats.DeleteConsumer(ctx, d.jetStreamClient, v.Jc); err != nil { + d.logger.Warnf("deleting jetstream consumer failed with err: %w", err) } } } @@ -50,7 +54,6 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { } } } - }() closed := false @@ -67,7 +70,6 @@ func (d *domain) HandleWebSocket(ctx context.Context, c *websocket.Conn) error { } for { - if closed { break } diff --git a/pkg/messaging/nats/jetstream-consumer.go b/pkg/messaging/nats/jetstream-consumer.go index 5ddab7a32..eb193d2c5 100644 --- a/pkg/messaging/nats/jetstream-consumer.go +++ b/pkg/messaging/nats/jetstream-consumer.go @@ -2,10 +2,11 @@ package nats import ( "context" - "github.com/kloudlite/api/pkg/errors" "os" "os/signal" + "github.com/kloudlite/api/pkg/errors" + "github.com/kloudlite/api/pkg/messaging/types" "github.com/kloudlite/api/pkg/nats" "github.com/nats-io/nats.go/jetstream" @@ -13,6 +14,7 @@ import ( type JetstreamConsumer struct { name string + stream string client *nats.JetstreamClient consumer jetstream.Consumer consumeCtx jetstream.ConsumeContext @@ -114,5 +116,10 @@ func NewJetstreamConsumer(ctx context.Context, jc *nats.JetstreamClient, args Je name: args.ConsumerConfig.Name, client: jc, consumer: c, + stream: args.Stream, }, nil } + +func DeleteConsumer(ctx context.Context, jc *nats.JetstreamClient, consumer *JetstreamConsumer) error { + return jc.Jetstream.DeleteConsumer(ctx, consumer.stream, consumer.name) +} From e66963bf5336bd5da7c3196745e412197b619946 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Wed, 14 Feb 2024 12:59:02 +0530 Subject: [PATCH 2/2] ci: adds github action to delete container images from ghcr.io --- .github/workflows/delete-images-from-ghcr.yml | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 .github/workflows/delete-images-from-ghcr.yml diff --git a/.github/workflows/delete-images-from-ghcr.yml b/.github/workflows/delete-images-from-ghcr.yml new file mode 100644 index 000000000..05f708ac1 --- /dev/null +++ b/.github/workflows/delete-images-from-ghcr.yml @@ -0,0 +1,44 @@ +name: Delete Container Images from github container registry + +on: + workflow_dispatch: + inputs: + image_tag_to_delete: + type: string + description: "image tag to delete" + required: true + default: "v0.0.0" + +permissions: + contents: read + packages: write + +jobs: + docker-builds: + strategy: + matrix: + app: + - accounts + - auth + - comms + - console + - container-registry + - iam + - infra + - worker-audit-logging + - webhook + - websocket-server + - message-office + - tenant-agent + + name: Delete image from ghcr.io + runs-on: ubuntu-latest + steps: + - name: Delete image + uses: bots-house/ghcr-delete-image-action@v1.1.0 + with: + owner: ${{ github.repository_owner }} + name: ${{ matrix.app }} + + token: ${{ secrets.GITHUB_TOKEN }} + tag: ${{ github.event.inputs.image_tag_to_delete }}