Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
13 changes: 1 addition & 12 deletions .tools/nvim/__http__/console/apps.graphql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,10 @@ label: Create App
query: |+ #graphql
mutation Core_createApp($envName: String!, $app: AppIn!) {
core_createApp(envName: $envName, app: $app) {
id
metadata {
name
}
syncStatus {
state
recordVersion
error
action
lastSyncedAt
}
createdBy{
userId
userName
userEmail
}
}
}
variables:
Expand Down
6 changes: 4 additions & 2 deletions apps/accounts/internal/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package framework
import (
"context"
"fmt"
"log/slog"
"time"

"github.com/kloudlite/api/common"
"github.com/kloudlite/api/pkg/errors"
"github.com/kloudlite/api/pkg/nats"
"time"

"github.com/kloudlite/api/pkg/kv"
"github.com/kloudlite/api/pkg/repos"
Expand All @@ -33,7 +35,7 @@ var Module = fx.Module("framework",
return &fm{env: ev}
}),

fx.Provide(func(ev *env.Env, logger logging.Logger) (*nats.JetstreamClient, error) {
fx.Provide(func(ev *env.Env, logger *slog.Logger) (*nats.JetstreamClient, error) {
name := "accounts:jetstream-client"
nc, err := nats.NewClient(ev.NatsURL, nats.ClientOpts{
Name: name,
Expand Down
9 changes: 9 additions & 0 deletions apps/accounts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"log/slog"
"os"
"time"

Expand Down Expand Up @@ -35,6 +36,14 @@ func main() {
return logger
}),

fx.Provide(func() *slog.Logger {
return logging.NewSlogLogger(logging.SlogOptions{
ShowCaller: true,
ShowDebugLogs: isDev,
SetAsDefaultLogger: true,
})
}),

fx.Provide(func() (*env.Env, error) {
if e, err := env.LoadEnv(); err != nil {
return nil, errors.NewE(err)
Expand Down
4 changes: 3 additions & 1 deletion apps/auth/internal/framework/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package framework
import (
"context"
"fmt"
"log/slog"

"github.com/kloudlite/api/common"
"github.com/kloudlite/api/pkg/errors"
"github.com/kloudlite/api/pkg/kv"
Expand Down Expand Up @@ -57,7 +59,7 @@ var Module fx.Option = fx.Module(

repos.NewMongoClientFx[*fm](),

fx.Provide(func(ev *env.Env, logger logging.Logger) (*nats.JetstreamClient, error) {
fx.Provide(func(ev *env.Env, logger *slog.Logger) (*nats.JetstreamClient, error) {
name := "auth:jetstream-client"
nc, err := nats.NewClient(ev.NatsURL, nats.ClientOpts{
Name: name,
Expand Down
10 changes: 10 additions & 0 deletions apps/auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"log/slog"
"time"

"github.com/kloudlite/api/pkg/errors"
Expand Down Expand Up @@ -36,6 +37,15 @@ func main() {
return logging.New(&logging.Options{Name: "auth", Dev: isDev})
},
),

fx.Provide(func() *slog.Logger {
return logging.NewSlogLogger(logging.SlogOptions{
ShowCaller: true,
ShowDebugLogs: isDev,
SetAsDefaultLogger: true,
})
}),

framework.Module,
)

Expand Down
3 changes: 2 additions & 1 deletion apps/comms/internal/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"context"
"fmt"
"log/slog"
"time"

"github.com/kloudlite/api/apps/comms/internal/app"
Expand Down Expand Up @@ -35,7 +36,7 @@ var Module = fx.Module(
return &fm{ev}
}),

fx.Provide(func(ev *env.Env, logger logging.Logger) (*nats.Client, error) {
fx.Provide(func(ev *env.Env, logger *slog.Logger) (*nats.Client, error) {
return nats.NewClient(ev.NatsURL, nats.ClientOpts{
Name: "comms",
Logger: logger,
Expand Down
18 changes: 13 additions & 5 deletions apps/comms/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"flag"
"log/slog"
"os"
"time"

Expand All @@ -30,11 +31,18 @@ func main() {

webApp := fx.New(
fx.NopLogger,
fx.Provide(
func() logging.Logger {
return logger
},
),
fx.Provide(func() logging.Logger {
return logger
}),

fx.Provide(func() *slog.Logger {
return logging.NewSlogLogger(logging.SlogOptions{
ShowCaller: true,
ShowDebugLogs: isDev,
SetAsDefaultLogger: true,
})
}),

fx.Provide(func() (*env.Env, error) {
return env.LoadEnv()
}),
Expand Down
21 changes: 11 additions & 10 deletions apps/console/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"log/slog"

"github.com/kloudlite/api/grpc-interfaces/kloudlite.io/rpc/console"
"github.com/kloudlite/api/pkg/k8s"
Expand Down Expand Up @@ -164,10 +165,10 @@ var Module = fx.Module("app",
}),

fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (ErrorOnApplyConsumer, error) {
topic := common.GetPlatformClusterMessagingTopic("*", "*", common.ConsoleReceiver, common.EventErrorOnApply)
topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventErrorOnApply)
consumerName := "console:error-on-apply"
return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
Stream: ev.NatsResourceSyncStream,
Stream: ev.NatsReceiveFromAgentStream,
ConsumerConfig: msg_nats.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
Expand All @@ -177,7 +178,7 @@ var Module = fx.Module("app",
})
}),

fx.Invoke(func(lf fx.Lifecycle, consumer ErrorOnApplyConsumer, d domain.Domain, logger logging.Logger) {
fx.Invoke(func(lf fx.Lifecycle, consumer ErrorOnApplyConsumer, d domain.Domain, logger *slog.Logger) {
lf.Append(fx.Hook{
OnStart: func(context.Context) error {
go ProcessErrorOnApply(consumer, d, logger)
Expand All @@ -190,11 +191,11 @@ var Module = fx.Module("app",
}),

fx.Provide(func(jc *nats.JetstreamClient, ev *env.Env, logger logging.Logger) (ResourceUpdateConsumer, error) {
topic := common.GetPlatformClusterMessagingTopic("*", "*", common.ConsoleReceiver, common.EventResourceUpdate)
topic := common.ReceiveFromAgentSubjectName(common.ReceiveFromAgentArgs{AccountName: "*", ClusterName: "*"}, common.ConsoleReceiver, common.EventResourceUpdate)

consumerName := "console:resource-updates"
return msg_nats.NewJetstreamConsumer(context.TODO(), jc, msg_nats.JetstreamConsumerArgs{
Stream: ev.NatsResourceSyncStream,
Stream: ev.NatsReceiveFromAgentStream,
ConsumerConfig: msg_nats.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
Expand All @@ -204,7 +205,7 @@ var Module = fx.Module("app",
})
}),

fx.Invoke(func(lf fx.Lifecycle, consumer ResourceUpdateConsumer, d domain.Domain, logger logging.Logger) {
fx.Invoke(func(lf fx.Lifecycle, consumer ResourceUpdateConsumer, d domain.Domain, logger *slog.Logger) {
lf.Append(fx.Hook{
OnStart: func(context.Context) error {
go ProcessResourceUpdates(consumer, d, logger)
Expand All @@ -220,23 +221,23 @@ var Module = fx.Module("app",
return domain.NewSvcBindingDomain(svcBindingRepo)
}),

fx.Provide(func(logger logging.Logger, sbd domain.ServiceBindingDomain, ev *env.Env) *dnsHandler {
fx.Provide(func(logger *slog.Logger, sbd domain.ServiceBindingDomain, ev *env.Env) *dnsHandler {
return &dnsHandler{
logger: logger,
serviceBindingDomain: sbd,
kloudliteDNSSuffix: ev.KloudliteDNSSuffix,
}
}),

fx.Invoke(func(server *DNSServer, handler *dnsHandler, lf fx.Lifecycle, logger logging.Logger) {
fx.Invoke(func(server *DNSServer, handler *dnsHandler, lf fx.Lifecycle, logger *slog.Logger) {
lf.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
logger.Infof("starting dns server at %s", server.Addr)
server.Handler = handler
go func() {
logger.Info("starting dns server", "at", server.Addr)
err := server.ListenAndServe()
if err != nil {
logger.Errorf(err, "failed to start dns server")
logger.Error("failed to start dns server, got", "err", err)
panic(err)
}
}()
Expand Down
17 changes: 11 additions & 6 deletions apps/console/internal/app/dns-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package app
import (
"context"
"fmt"
"log/slog"
"strings"
"time"

"github.com/kloudlite/api/apps/console/internal/domain"
"github.com/kloudlite/api/pkg/logging"
"github.com/kloudlite/operator/pkg/errors"
"github.com/miekg/dns"
)

type dnsHandler struct {
logger logging.Logger
logger *slog.Logger
serviceBindingDomain domain.ServiceBindingDomain
kloudliteDNSSuffix string
}
Expand All @@ -22,8 +23,10 @@ const (
)

func (h *dnsHandler) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
logger := h.logger.WithKV("query", r.Question[0].Name)
logger.Debugf("incoming dns request")
logger := h.logger.With("query", r.Question[0].Name)
logger.Debug("INCOMING dns request")
start := time.Now()

msg := new(dns.Msg)
msg.SetReply(r)
msg.Authoritative = true
Expand All @@ -34,15 +37,17 @@ func (h *dnsHandler) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
for _, question := range r.Question {
answers, err := h.resolver(ctx, question.Name, question.Qtype)
if err != nil {
h.logger.Errorf(err)
logger.Error("FAILED to resolve dns record, got", "err", err, "question", question.Name)
msg.Rcode = dns.RcodeNameError
continue
}
msg.Answer = append(msg.Answer, answers...)
}

w.WriteMsg(msg)
logger.WithKV("answers", msg.Answer).Debugf("outgoing dns request")
if msg.Rcode != dns.RcodeNameError {
logger.Info("RESOLVED dns request", "answers", msg.Answer, "took", fmt.Sprintf("%.2fs", time.Since(start).Seconds()))
}
}

func (h *dnsHandler) newRR(domain string, ttl int, ip string) ([]dns.RR, error) {
Expand Down
36 changes: 22 additions & 14 deletions apps/console/internal/app/process-error-on-apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"

t "github.com/kloudlite/api/apps/tenant-agent/types"
"github.com/kloudlite/api/pkg/errors"
Expand All @@ -13,17 +16,14 @@ import (
"github.com/kloudlite/api/apps/console/internal/entities"
msgOfficeT "github.com/kloudlite/api/apps/message-office/types"
fn "github.com/kloudlite/api/pkg/functions"
"github.com/kloudlite/api/pkg/logging"
"github.com/kloudlite/api/pkg/messaging"
msgTypes "github.com/kloudlite/api/pkg/messaging/types"
crdsv1 "github.com/kloudlite/operator/apis/crds/v1"
)

type ErrorOnApplyConsumer messaging.Consumer

func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, d domain.Domain, logger logging.Logger) {
counter := 0

func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, d domain.Domain, logger *slog.Logger) {
getEnvironmentResourceContext := func(ctx domain.ConsoleContext, resType entities.ResourceType, clusterName string, obj unstructured.Unstructured) (domain.ResourceContext, error) {
mapping, err := d.GetEnvironmentResourceMapping(ctx, resType, clusterName, obj.GetNamespace(), obj.GetName())
if err != nil {
Expand All @@ -35,9 +35,18 @@ func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, d domain.Domain, logger
return newResourceContext(ctx, mapping.EnvironmentName), nil
}

counter := 0
mu := sync.Mutex{}

msgReader := func(msg *msgTypes.ConsumeMsg) error {
mu.Lock()
counter += 1
logger.Debugf("received message [%d]", counter)
mu.Unlock()

start := time.Now()

logger := logger.With("subject", msg.Subject, "counter", counter)
logger.Debug("INCOMING message", "counter", counter)

em, err := msgOfficeT.UnmarshalErrMessage(msg.Payload)
if err != nil {
Expand All @@ -50,25 +59,24 @@ func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, d domain.Domain, logger
}

obj := unstructured.Unstructured{Object: errObj.Object}
gvkStr := obj.GroupVersionKind().String()

mLogger := logger.WithKV(
"gvk", obj.GroupVersionKind(),
"nn", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetName()),
mlogger := logger.With(
"GVK", gvkStr,
"NN", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetName()),
"accountName", em.AccountName,
"clusterName", em.ClusterName,
)

mLogger.Infof("received message")
mlogger.Info("validated message")
defer func() {
mLogger.Infof("processed message")
mlogger.Info("PROCESSED message", "took", fmt.Sprintf("%.2fs", time.Since(start).Seconds()))
}()

dctx := domain.NewConsoleContext(context.TODO(), "sys-user:apply-on-error-worker", em.AccountName)

opts := domain.UpdateAndDeleteOpts{MessageTimestamp: msg.Timestamp}

gvkStr := obj.GroupVersionKind().String()

switch gvkStr {
case environmentGVK.String():
{
Expand Down Expand Up @@ -204,10 +212,10 @@ func ProcessErrorOnApply(consumer ErrorOnApplyConsumer, d domain.Domain, logger

if err := consumer.Consume(msgReader, msgTypes.ConsumeOpts{
OnError: func(err error) error {
logger.Errorf(err, "received while reading messages, ignoring it")
logger.Error("while reading messages, got", "err", err)
return nil
},
}); err != nil {
logger.Errorf(err, "error while consuming messages")
logger.Error("while consuming messages, got", "err", err)
}
}
Loading