Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions apps/message-office/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ var Module = fx.Module("app",

fx.Provide(func(restCfg *rest.Config) (kubectl.ControllerClient, error) {
scheme := runtime.NewScheme()
artifactsv1.AddToScheme(scheme)
if err := artifactsv1.AddToScheme(scheme); err != nil {
return nil, err
}
return kubectl.NewClientWithScheme(restCfg, scheme)
}),

Expand All @@ -50,10 +52,7 @@ var Module = fx.Module("app",
repos.NewFxMongoRepo[*domain.MessageOfficeToken]("mo_tokens", "mot", domain.MOTokenIndexes),
repos.NewFxMongoRepo[*domain.AccessToken]("acc_tokens", "acct", domain.AccessTokenIndexes),
fx.Invoke(
func(
server *fiber.App,
d domain.Domain,
) {
func(server *fiber.App, d domain.Domain) {
schema := generated.NewExecutableSchema(
generated.Config{
Resolvers: &graph.Resolver{Domain: d},
Expand Down
238 changes: 133 additions & 105 deletions apps/message-office/internal/app/grpc-server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package app

import (
context "context"
"context"
"fmt"
"time"

artifactsv1 "github.com/kloudlite/operator/apis/artifacts/v1"
"github.com/kloudlite/operator/grpc-interfaces/grpc/messages"
"github.com/kloudlite/operator/pkg/kubectl"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -39,60 +40,78 @@ type grpcServer struct {
byocClientUpdatesCounter int64
}

// ReceiveErrors implements messages.MessageDispatchServiceServer
func (g *grpcServer) ReceiveErrors(server messages.MessageDispatchService_ReceiveErrorsServer) error {
for {
errorMsg, err := server.Recv()
func (g *grpcServer) GetDockerCredentials(ctx context.Context, in *messages.GetDockerCredentialsIn) (*messages.GetDockerCredentialsOut, error) {
g.logger.Infof("request received for docker credentials for account=%q, cluster=%q", in.AccountName, in.ClusterName)
defer func() {
g.logger.Infof("request processed for docker credentials for account=%q, cluster=%q", in.AccountName, in.ClusterName)
}()

if err := g.domain.ValidateAccessToken(ctx, in.AccessToken, in.AccountName, in.ClusterName); err != nil {
return nil, err
}

var hu artifactsv1.HarborUserAccount
if err := g.k8sControllerCli.Get(ctx, types.NamespacedName{Namespace: constants.NamespaceCore, Name: in.AccountName}, &hu); err != nil {
return nil, err
}

var harborSecret corev1.Secret
if err := g.k8sControllerCli.Get(ctx, types.NamespacedName{Namespace: constants.NamespaceCore, Name: hu.Spec.TargetSecret}, &harborSecret); err != nil {
return nil, err
}

return &messages.GetDockerCredentialsOut{DockerConfigJson: string(harborSecret.Data[".dockerconfigjson"])}, nil
}

func (g *grpcServer) parseError(ctx context.Context, errMsg *messages.ErrorData) (err error) {
g.errorMessagesCounter++
logger := g.logger.WithKV("accountName", errMsg.AccountName).WithKV("cluster", errMsg.ClusterName)

logger.Infof("[%v] received error-on-apply message", g.errorMessagesCounter)
defer func() {
if err != nil {
return err
err = errors.Wrap(err, fmt.Sprintf("[%v] (with ERROR) processed error-on-apply message", g.byocClientUpdatesCounter))
logger.Errorf(err)
return
}
logger.Infof("[%v] processed error-on-apply message", g.infraUpdatesCounter)
}()

g.errorMessagesCounter++
g.logger.Infof("[%v] received error-on-apply message", g.errorMessagesCounter)
g.logger.Infof("[%v] [error]: %s\n", g.errorMessagesCounter, errorMsg.Data)
if err := g.domain.ValidateAccessToken(ctx, errMsg.AccessToken, errMsg.AccountName, errMsg.ClusterName); err != nil {
return errors.Wrap(err, "while validating access token")
}

if err := g.domain.ValidateAccessToken(server.Context(), errorMsg.AccessToken, errorMsg.AccountName, errorMsg.ClusterName); err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while validating access token", g.resourceUpdatesCounter))
return err
}
if _, err := g.producer.Produce(ctx, g.ev.KafkaTopicErrorOnApply, errMsg.ClusterName, errMsg.Data); err != nil {
return errors.Wrap(err, fmt.Sprintf("while producing to topic (%s)", g.ev.KafkaTopicErrorOnApply))
}
logger.Infof("[%v] dispatched error-on-apply message", g.errorMessagesCounter)
return nil
}

po, err := g.producer.Produce(server.Context(), g.ev.KafkaTopicErrorOnApply, errorMsg.ClusterName, errorMsg.Data)
// ReceiveErrors implements messages.MessageDispatchServiceServer
func (g *grpcServer) ReceiveErrors(server messages.MessageDispatchService_ReceiveErrorsServer) error {
for {
errorMsg, err := server.Recv()
if err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while producing to topic (%s)", g.resourceUpdatesCounter, g.ev.KafkaTopicErrorOnApply))
return err
}
g.logger.WithKV("topic", g.ev.KafkaTopicErrorOnApply).
WithKV("parition", po.Partition).
WithKV("offset", po.Offset).
Infof("%v dispatched error-on-apply message", g.errorMessagesCounter)
_ = g.parseError(server.Context(), errorMsg)
}
}

// GetAccessToken implements messages.MessageDispatchServiceServer
func (g *grpcServer) GetAccessToken(ctx context.Context, msg *messages.GetClusterTokenIn) (*messages.GetClusterTokenOut, error) {
g.logger.Infof("request received for clustertoken: %s", msg.ClusterToken)
defer func() {
g.logger.Infof("request processed for clustertoken: %s", msg.ClusterToken)
}()
g.logger.Infof("request received for cluster-token (%q) exchange", msg.ClusterToken)

record, err := g.domain.GenAccessToken(ctx, msg.ClusterToken)
if err != nil {
return nil, err
}

var hu artifactsv1.HarborUserAccount
if err := g.k8sControllerCli.Get(ctx, types.NamespacedName{Namespace: constants.NamespaceCore, Name: record.AccountName}, &hu); err != nil {
return nil, err
}

var harborSecret corev1.Secret
if err := g.k8sControllerCli.Get(ctx, types.NamespacedName{Namespace: constants.NamespaceCore, Name: hu.Spec.TargetSecret}, &harborSecret); err != nil {
return nil, err
}
g.logger.Infof("SUCCESSFUL cluster-token (%q) exchange for account=%q, cluster=%q", msg.ClusterToken, record.AccountName, record.ClusterName)

return &messages.GetClusterTokenOut{
AccessToken: record.AccessToken,
HarborDockerConfigJson: string(harborSecret.Data[".dockerconfigjson"]),
AccessToken: record.AccessToken,
}, nil
}

Expand All @@ -107,10 +126,7 @@ func (g *grpcServer) createConsumer(ev *env.Env, topicName string) (redpanda.Con
}, []string{topicName})
}

func (g grpcServer) SendActions(
request *messages.StreamActionsRequest,
server messages.MessageDispatchService_SendActionsServer,
) error {
func (g *grpcServer) SendActions(request *messages.StreamActionsRequest, server messages.MessageDispatchService_SendActionsServer) error {
if err := g.domain.ValidateAccessToken(server.Context(), request.AccessToken, request.AccountName, request.ClusterName); err != nil {
return err
}
Expand Down Expand Up @@ -145,12 +161,6 @@ func (g grpcServer) SendActions(
consumer.Close()
}()

defer func() {
g.logger.Debugf("kafka consumer has been closed")
delete(g.consumers, key)
consumer.Close()
}()

consumer.StartConsumingSync(func(msg []byte, timeStamp time.Time, offset int64) error {
g.logger.WithKV("timestamp", timeStamp).Infof("received message")
defer func() {
Expand All @@ -161,29 +171,66 @@ func (g grpcServer) SendActions(
return nil
}

func (g *grpcServer) processResourceUpdate(ctx context.Context, msg *messages.ResourceUpdate) (err error) {
g.resourceUpdatesCounter++

logger := g.logger.WithKV("accountName", msg.AccountName).WithKV("clusterName", msg.ClusterName)
logger.Infof("[%v] received resource status update", g.resourceUpdatesCounter)
defer func() {
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("[%v] (with ERROR) processed resource status update", g.byocClientUpdatesCounter))
logger.Errorf(err)
return
}
logger.Infof("[%v] processed resource status update", g.infraUpdatesCounter)
}()

if err = g.domain.ValidateAccessToken(ctx, msg.AccessToken, msg.AccountName, msg.ClusterName); err != nil {
return errors.Wrap(err, fmt.Sprintf("[%v] while validating accessToken", g.resourceUpdatesCounter))
}

if _, err := g.producer.Produce(ctx, g.ev.KafkaTopicStatusUpdates, msg.ClusterName, msg.Message); err != nil {
return errors.Wrap(err, fmt.Sprintf("while producing resource update to topic %q", g.ev.KafkaTopicStatusUpdates))
}

logger.Infof("[%v] dispatched status updates to topic %q", g.resourceUpdatesCounter, g.ev.KafkaTopicStatusUpdates)
return nil
}

func (g *grpcServer) ReceiveResourceUpdates(server messages.MessageDispatchService_ReceiveResourceUpdatesServer) error {
for {
statusMsg, err := server.Recv()
if err != nil {
return err
}
_ = g.processResourceUpdate(server.Context(), statusMsg)
}
}

g.resourceUpdatesCounter++
g.logger.Infof("[%v] received status update", g.resourceUpdatesCounter)

if err = g.domain.ValidateAccessToken(server.Context(), statusMsg.AccessToken, statusMsg.AccountName, statusMsg.ClusterName); err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while processing resource update", g.resourceUpdatesCounter))
return err
}
func (g *grpcServer) processBYOCClientUpdate(ctx context.Context, msg *messages.BYOCClientUpdate) (err error) {
g.byocClientUpdatesCounter++
logger := g.logger.WithKV("accountName", msg.AccountName).WithKV("clusterName", msg.ClusterName)

po, err := g.producer.Produce(server.Context(), g.ev.KafkaTopicStatusUpdates, statusMsg.ClusterName, statusMsg.Message)
logger.Infof("[%v] received BYOC client update", g.byocClientUpdatesCounter)
defer func() {
if err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while processing resource update", g.resourceUpdatesCounter))
return err
err = errors.Wrap(err, fmt.Sprintf("[%v] (with ERROR) processed BYOC client update", g.byocClientUpdatesCounter))
logger.Errorf(err)
return
}
g.logger.Infof("[%v] processed status update", g.resourceUpdatesCounter)
g.logger.WithKV("topic", g.ev.KafkaTopicStatusUpdates).WithKV("parition", po.Partition).WithKV("offset", po.Offset).Infof("%v dispatched status updates", g.resourceUpdatesCounter)
logger.Infof("[%v] processed BYOC client update", g.infraUpdatesCounter)
}()

if err = g.domain.ValidateAccessToken(ctx, msg.AccessToken, msg.AccountName, msg.ClusterName); err != nil {
return errors.Wrap(err, "while validating access token")
}

if _, err := g.producer.Produce(ctx, g.ev.KafkaTopicBYOCClientUpdates, msg.ClusterName, msg.Message); err != nil {
return errors.Wrap(err, fmt.Sprintf("while producing message into kafka topic (%s) for ", g.ev.KafkaTopicBYOCClientUpdates))
}

logger.Infof("%v dispatched byoc client updates into topic=%q", g.byocClientUpdatesCounter, g.ev.KafkaTopicBYOCClientUpdates)
return nil
}

func (g *grpcServer) ReceiveBYOCClientUpdates(server messages.MessageDispatchService_ReceiveBYOCClientUpdatesServer) (err error) {
Expand All @@ -193,67 +240,48 @@ func (g *grpcServer) ReceiveBYOCClientUpdates(server messages.MessageDispatchSer
return err
}

g.byocClientUpdatesCounter++
g.logger.Infof("[%v] received byoc client update", g.byocClientUpdatesCounter)
_ = g.processBYOCClientUpdate(server.Context(), clientUpdateMsg)
}
}

func (g *grpcServer) processInfraUpdate(ctx context.Context, msg *messages.InfraUpdate) (err error) {
g.infraUpdatesCounter++

if err = g.domain.ValidateAccessToken(server.Context(), clientUpdateMsg.AccessToken, clientUpdateMsg.AccountName, clientUpdateMsg.ClusterName); err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while processing BYOC Client update message", g.resourceUpdatesCounter))
return err
}
logger := g.logger.WithKV("accountName", msg.AccountName).WithKV("clusterName", msg.ClusterName)

po, err := g.producer.Produce(server.Context(), g.ev.KafkaTopicBYOCClientUpdates, clientUpdateMsg.ClusterName, clientUpdateMsg.Message)
logger.Infof("[%v] received infra update", g.infraUpdatesCounter)
defer func() {
if err != nil {
g.logger.Errorf(err, fmt.Sprintf("[%v] ERROR while processing BYOC Client update message", g.resourceUpdatesCounter))
return err
err = errors.Wrap(err, fmt.Sprintf("[%v] (with ERROR) processed infra update", g.infraUpdatesCounter))
g.logger.Errorf(err)
return
}
g.logger.Infof("[%v] processed BYOC Client ClientUpdate", g.byocClientUpdatesCounter)
g.logger.WithKV("topic", g.ev.KafkaTopicBYOCClientUpdates).
WithKV("parition", po.Partition).
WithKV("offset", po.Offset).
Infof("%v dispatched byoc client updates", g.byocClientUpdatesCounter)
g.logger.Infof("[%v] processed infra update", g.infraUpdatesCounter)
}()

if err := g.domain.ValidateAccessToken(ctx, msg.AccessToken, msg.AccountName, msg.ClusterName); err != nil {
return err
}

po, err := g.producer.Produce(ctx, g.ev.KafkaTopicInfraUpdates, msg.ClusterName, msg.Message)
if err != nil {
return err
}

g.logger.WithKV("topic", g.ev.KafkaTopicInfraUpdates).
WithKV("partition", po.Partition).
WithKV("offset", po.Offset).
Infof("%v dispatched infra updates", g.infraUpdatesCounter)
return nil
}

// ReceiveInfraUpdates implements messages.MessageDispatchServiceServer
func (g *grpcServer) ReceiveInfraUpdates(
server messages.MessageDispatchService_ReceiveInfraUpdatesServer,
) (err error) {
func (g *grpcServer) ReceiveInfraUpdates(server messages.MessageDispatchService_ReceiveInfraUpdatesServer) (err error) {
for {
statusMsg, err := server.Recv()
if err != nil {
return err
}

g.infraUpdatesCounter++
g.logger.Infof("%v received infra update", g.infraUpdatesCounter)

defer func() {
if err != nil {
g.logger.Errorf(
err,
fmt.Sprintf("[%v] ERROR while processing infra update", g.infraUpdatesCounter),
)
return
}
g.logger.Infof("[%v] processed infra update", g.infraUpdatesCounter)
}()

if err := g.domain.ValidateAccessToken(server.Context(), statusMsg.AccessToken, statusMsg.AccountName, statusMsg.ClusterName); err != nil {
return err
}

po, err := g.producer.Produce(
server.Context(),
g.ev.KafkaTopicInfraUpdates,
statusMsg.ClusterName,
statusMsg.Message,
)
if err != nil {
return err
}
g.logger.WithKV("topic", g.ev.KafkaTopicInfraUpdates).
WithKV("parition", po.Partition).
WithKV("offset", po.Offset).
Infof("%v dispatched infra updates", g.infraUpdatesCounter)
_ = g.processInfraUpdate(server.Context(), statusMsg)
}
}
2 changes: 0 additions & 2 deletions apps/message-office/internal/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type domain struct {
accessTokenRepo repos.DbRepo[*AccessToken]
}

// ValidationAccessToken implements Domain
func (d *domain) ValidateAccessToken(ctx context.Context, accessToken string, accountName string, clusterName string) error {
r, err := d.accessTokenRepo.FindOne(ctx, repos.Filter{
"accessToken": accessToken,
Expand Down Expand Up @@ -82,7 +81,6 @@ func (d *domain) GenAccessToken(ctx context.Context, clusterToken string) (*Acce
return nil, fmt.Errorf("a valid access-token has already been issued for this cluster token")
}


record, err := d.accessTokenRepo.Upsert(ctx, repos.Filter{
"accountName": mot.AccountName,
"clusterName": mot.ClusterName,
Expand Down
2 changes: 1 addition & 1 deletion apps/message-office/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.uber.org/fx"
"k8s.io/client-go/rest"

env "kloudlite.io/apps/message-office/internal/env"
"kloudlite.io/apps/message-office/internal/env"
"kloudlite.io/apps/message-office/internal/framework"
"kloudlite.io/pkg/k8s"
"kloudlite.io/pkg/logging"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/google/go-github/v45 v45.2.0
github.com/gorilla/websocket v1.5.0
github.com/kloudlite/cluster-operator v0.0.0-20230329090334-40fc9f00d55e
github.com/kloudlite/operator v0.0.0-20230510120745-e9e57e63e0e6
github.com/kloudlite/operator v0.0.0-20230519115853-9aa81fd2e1f7
github.com/kloudlite/wg-operator v0.0.0-20230329090407-183297dc23b8
github.com/matoous/go-nanoid/v2 v2.0.0
github.com/miekg/dns v1.1.41
Expand Down Expand Up @@ -172,7 +172,7 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

// replace github.com/kloudlite/operator v0.0.0-20230510063950-30b6ae214d30 => /home/nxtcoder17/workspace/kloudlite-forks/kloudlite-operator
// replace github.com/kloudlite/operator v0.0.0-20230515115651-baa43440ec41 => /home/nxtcoder17/workspace/kloudlite-forks/kloudlite-operator

// replace github.com/kloudlite/cluster-operator v0.0.0-20230213105023-96cd6d1e38d3 => /home/vision/kloudlite/cluster-operator
// replace github.com/kloudlite/cluster-operator v0.0.0-20230213105023-96cd6d1e38d3 => /home/nxtcoder17/workspace/kloudlite/cluster-operator
Expand Down
Loading