From 71e7d1c7d5fa8ae2b5ce7141acc3b2f203eebe41 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Thu, 24 Oct 2024 23:10:38 +0530 Subject: [PATCH 1/2] feat: tenant-agent now syncs Cluster Gateway resource prior to streaming actions gh-issue: kloudlite/kloudlite#299, kloudlite/kloudlite#300, kloudlite/kloudlite#301 --- apps/infra/internal/app/grpc-server.go | 18 ++ apps/infra/internal/domain/api.go | 3 + .../domain/global-vpn-cluster-connection.go | 17 ++ apps/infra/protobufs/infra.proto | 11 + apps/infra/protobufs/infra/infra.pb.go | 229 ++++++++++++++---- apps/infra/protobufs/infra/infra_grpc.pb.go | 37 +++ apps/message-office/internal/app/app.go | 2 +- .../internal/app/grpc-server.go | 26 +- apps/tenant-agent/main.go | 64 +++-- pkg/functions/k8s-utils.go | 3 +- 10 files changed, 346 insertions(+), 64 deletions(-) diff --git a/apps/infra/internal/app/grpc-server.go b/apps/infra/internal/app/grpc-server.go index 468749176..352f7e9a9 100644 --- a/apps/infra/internal/app/grpc-server.go +++ b/apps/infra/internal/app/grpc-server.go @@ -22,6 +22,24 @@ type grpcServer struct { logger *slog.Logger } +// GetClusterGatewayResource implements infra.InfraServer. +func (g *grpcServer) GetClusterGatewayResource(ctx context.Context, in *infra.GetClusterGatewayResourceIn) (*infra.GetClusterGatewayResourceOut, error) { + l := grpc.NewRequestLogger(g.logger, "EnsureGlobalVPNConnection") + defer l.End() + + gw, err := g.d.GetGatewayResource(ctx, in.AccountName, in.ClusterName) + if err != nil { + return nil, err + } + + b, err := fn.K8sObjToYAML(gw) + if err != nil { + return nil, errors.NewE(err) + } + + return &infra.GetClusterGatewayResourceOut{Gateway: b}, nil +} + // EnsureGlobalVPNConnection implements infra.InfraServer. func (g *grpcServer) EnsureGlobalVPNConnection(ctx context.Context, in *infra.EnsureGlobalVPNConnectionIn) (*infra.EnsureGlobalVPNConnectionOut, error) { l := grpc.NewRequestLogger(g.logger, "EnsureGlobalVPNConnection") diff --git a/apps/infra/internal/domain/api.go b/apps/infra/internal/domain/api.go index 52848a237..382356af8 100644 --- a/apps/infra/internal/domain/api.go +++ b/apps/infra/internal/domain/api.go @@ -4,6 +4,7 @@ import ( "context" "time" + klNetworkingv1 "github.com/kloudlite/operator/apis/networking/v1" networkingv1 "k8s.io/api/networking/v1" "github.com/kloudlite/api/apps/infra/internal/entities" @@ -61,6 +62,8 @@ type Domain interface { ListGlobalVPN(ctx InfraContext, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.GlobalVPN], error) GetGlobalVPN(ctx InfraContext, name string) (*entities.GlobalVPN, error) + GetGatewayResource(ctx context.Context, accountName string, clusterName string) (*klNetworkingv1.Gateway, error) + CreateGlobalVPNDevice(ctx InfraContext, device entities.GlobalVPNDevice) (*entities.GlobalVPNDevice, error) UpdateGlobalVPNDevice(ctx InfraContext, device entities.GlobalVPNDevice) (*entities.GlobalVPNDevice, error) DeleteGlobalVPNDevice(ctx InfraContext, gvpn string, device string) error diff --git a/apps/infra/internal/domain/global-vpn-cluster-connection.go b/apps/infra/internal/domain/global-vpn-cluster-connection.go index a141e844c..a9d5dd0d5 100644 --- a/apps/infra/internal/domain/global-vpn-cluster-connection.go +++ b/apps/infra/internal/domain/global-vpn-cluster-connection.go @@ -89,6 +89,23 @@ func (d *domain) getGlobalVPNConnectionPeers(args getGlobalVPNConnectionPeersArg return peers } +// GetGatewayResource implements Domain. +func (d *domain) GetGatewayResource(ctx context.Context, accountName string, clusterName string) (*networkingv1.Gateway, error) { + gw, err := d.gvpnConnRepo.FindOne(ctx, repos.Filter{ + fc.AccountName: accountName, + fc.ClusterName: clusterName, + }) + if err != nil { + return nil, err + } + + if gw == nil { + return nil, fmt.Errorf("failed to find gateway resource") + } + + return &gw.Gateway, nil +} + func (d *domain) listGlobalVPNConnections(ctx InfraContext, vpnName string) ([]*entities.GlobalVPNConnection, error) { return d.gvpnConnRepo.Find(ctx, repos.Query{ Filter: repos.Filter{ diff --git a/apps/infra/protobufs/infra.proto b/apps/infra/protobufs/infra.proto index a6c8cc719..fc36bdeff 100644 --- a/apps/infra/protobufs/infra.proto +++ b/apps/infra/protobufs/infra.proto @@ -13,6 +13,8 @@ service Infra { rpc MarkClusterOnlineAt(MarkClusterOnlineAtIn) returns (MarkClusterOnlineAtOut); rpc EnsureGlobalVPNConnection(EnsureGlobalVPNConnectionIn) returns (EnsureGlobalVPNConnectionOut); + + rpc GetClusterGatewayResource(GetClusterGatewayResourceIn) returns (GetClusterGatewayResourceOut); } message GetClusterIn { @@ -87,3 +89,12 @@ message EnsureGlobalVPNConnectionIn { } message EnsureGlobalVPNConnectionOut {} + +message GetClusterGatewayResourceIn { + string accountName = 1; + string clusterName = 2; +} + +message GetClusterGatewayResourceOut { + bytes gateway = 1; +} diff --git a/apps/infra/protobufs/infra/infra.pb.go b/apps/infra/protobufs/infra/infra.pb.go index b81be3496..b66bee2b5 100644 --- a/apps/infra/protobufs/infra/infra.pb.go +++ b/apps/infra/protobufs/infra/infra.pb.go @@ -736,6 +736,108 @@ func (*EnsureGlobalVPNConnectionOut) Descriptor() ([]byte, []int) { return file_infra_proto_rawDescGZIP(), []int{10} } +type GetClusterGatewayResourceIn struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AccountName string `protobuf:"bytes,1,opt,name=accountName,proto3" json:"accountName,omitempty"` + ClusterName string `protobuf:"bytes,2,opt,name=clusterName,proto3" json:"clusterName,omitempty"` +} + +func (x *GetClusterGatewayResourceIn) Reset() { + *x = GetClusterGatewayResourceIn{} + if protoimpl.UnsafeEnabled { + mi := &file_infra_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetClusterGatewayResourceIn) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetClusterGatewayResourceIn) ProtoMessage() {} + +func (x *GetClusterGatewayResourceIn) ProtoReflect() protoreflect.Message { + mi := &file_infra_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetClusterGatewayResourceIn.ProtoReflect.Descriptor instead. +func (*GetClusterGatewayResourceIn) Descriptor() ([]byte, []int) { + return file_infra_proto_rawDescGZIP(), []int{11} +} + +func (x *GetClusterGatewayResourceIn) GetAccountName() string { + if x != nil { + return x.AccountName + } + return "" +} + +func (x *GetClusterGatewayResourceIn) GetClusterName() string { + if x != nil { + return x.ClusterName + } + return "" +} + +type GetClusterGatewayResourceOut struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Gateway []byte `protobuf:"bytes,1,opt,name=gateway,proto3" json:"gateway,omitempty"` +} + +func (x *GetClusterGatewayResourceOut) Reset() { + *x = GetClusterGatewayResourceOut{} + if protoimpl.UnsafeEnabled { + mi := &file_infra_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetClusterGatewayResourceOut) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetClusterGatewayResourceOut) ProtoMessage() {} + +func (x *GetClusterGatewayResourceOut) ProtoReflect() protoreflect.Message { + mi := &file_infra_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetClusterGatewayResourceOut.ProtoReflect.Descriptor instead. +func (*GetClusterGatewayResourceOut) Descriptor() ([]byte, []int) { + return file_infra_proto_rawDescGZIP(), []int{12} +} + +func (x *GetClusterGatewayResourceOut) GetGateway() []byte { + if x != nil { + return x.Gateway + } + return nil +} + var File_infra_proto protoreflect.FileDescriptor var file_infra_proto_rawDesc = []byte{ @@ -834,35 +936,50 @@ var file_infra_proto_rawDesc = []byte{ 0x68, 0x41, 0x64, 0x64, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x1e, 0x0a, 0x1c, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, - 0x32, 0xae, 0x03, 0x0a, 0x05, 0x49, 0x6e, 0x66, 0x72, 0x61, 0x12, 0x2b, 0x0a, 0x0a, 0x47, 0x65, - 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x1a, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x12, 0x2f, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x79, - 0x6f, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x1a, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x4f, 0x75, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x4e, - 0x6f, 0x64, 0x65, 0x70, 0x6f, 0x6f, 0x6c, 0x12, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x6f, 0x64, - 0x65, 0x70, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x1a, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x6f, 0x64, - 0x65, 0x70, 0x6f, 0x6f, 0x6c, 0x4f, 0x75, 0x74, 0x12, 0x34, 0x0a, 0x0d, 0x43, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x10, 0x2e, 0x43, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x49, 0x6e, 0x1a, 0x11, 0x2e, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x3f, - 0x0a, 0x14, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4b, 0x75, 0x62, 0x65, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x6e, 0x1a, 0x18, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x4b, 0x75, 0x62, 0x65, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x75, 0x74, 0x12, - 0x46, 0x0a, 0x13, 0x4d, 0x61, 0x72, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4f, 0x6e, - 0x6c, 0x69, 0x6e, 0x65, 0x41, 0x74, 0x12, 0x16, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x43, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x41, 0x74, 0x49, 0x6e, 0x1a, 0x17, - 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4f, 0x6e, 0x6c, 0x69, - 0x6e, 0x65, 0x41, 0x74, 0x4f, 0x75, 0x74, 0x12, 0x58, 0x0a, 0x19, 0x45, 0x6e, 0x73, 0x75, 0x72, - 0x65, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x2e, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, 0x6f, - 0x62, 0x61, 0x6c, 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x49, 0x6e, 0x1a, 0x1d, 0x2e, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, 0x6f, 0x62, 0x61, - 0x6c, 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, - 0x74, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x69, 0x6e, 0x66, 0x72, 0x61, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x61, 0x0a, 0x1b, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x47, 0x61, + 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x6e, 0x12, + 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, + 0x61, 0x6d, 0x65, 0x22, 0x38, 0x0a, 0x1c, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x4f, 0x75, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x67, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x32, 0x88, 0x04, + 0x0a, 0x05, 0x49, 0x6e, 0x66, 0x72, 0x61, 0x12, 0x2b, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x49, 0x6e, 0x1a, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x4f, 0x75, 0x74, 0x12, 0x2f, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x42, 0x79, 0x6f, 0x6b, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x49, 0x6e, 0x1a, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x4f, 0x75, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x4e, 0x6f, 0x64, 0x65, + 0x70, 0x6f, 0x6f, 0x6c, 0x12, 0x0e, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x70, 0x6f, + 0x6f, 0x6c, 0x49, 0x6e, 0x1a, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x6f, 0x64, 0x65, 0x70, 0x6f, + 0x6f, 0x6c, 0x4f, 0x75, 0x74, 0x12, 0x34, 0x0a, 0x0d, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x10, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x49, 0x6e, 0x1a, 0x11, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x4f, 0x75, 0x74, 0x12, 0x3f, 0x0a, 0x14, 0x47, + 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4b, 0x75, 0x62, 0x65, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x12, 0x0d, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x49, 0x6e, 0x1a, 0x18, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4b, + 0x75, 0x62, 0x65, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x75, 0x74, 0x12, 0x46, 0x0a, 0x13, + 0x4d, 0x61, 0x72, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, + 0x65, 0x41, 0x74, 0x12, 0x16, 0x2e, 0x4d, 0x61, 0x72, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x41, 0x74, 0x49, 0x6e, 0x1a, 0x17, 0x2e, 0x4d, 0x61, + 0x72, 0x6b, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4f, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x41, + 0x74, 0x4f, 0x75, 0x74, 0x12, 0x58, 0x0a, 0x19, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, + 0x6f, 0x62, 0x61, 0x6c, 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x1c, 0x2e, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, + 0x56, 0x50, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x1a, + 0x1d, 0x2e, 0x45, 0x6e, 0x73, 0x75, 0x72, 0x65, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x56, 0x50, + 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x12, 0x58, + 0x0a, 0x19, 0x47, 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x47, 0x61, 0x74, 0x65, + 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x1c, 0x2e, 0x47, 0x65, + 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x6e, 0x1a, 0x1d, 0x2e, 0x47, 0x65, 0x74, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x52, 0x65, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4f, 0x75, 0x74, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x69, 0x6e, + 0x66, 0x72, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -877,7 +994,7 @@ func file_infra_proto_rawDescGZIP() []byte { return file_infra_proto_rawDescData } -var file_infra_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_infra_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_infra_proto_goTypes = []any{ (*GetClusterIn)(nil), // 0: GetClusterIn (*GetClusterOut)(nil), // 1: GetClusterOut @@ -890,10 +1007,12 @@ var file_infra_proto_goTypes = []any{ (*MarkClusterOnlineAtOut)(nil), // 8: MarkClusterOnlineAtOut (*EnsureGlobalVPNConnectionIn)(nil), // 9: EnsureGlobalVPNConnectionIn (*EnsureGlobalVPNConnectionOut)(nil), // 10: EnsureGlobalVPNConnectionOut - (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp + (*GetClusterGatewayResourceIn)(nil), // 11: GetClusterGatewayResourceIn + (*GetClusterGatewayResourceOut)(nil), // 12: GetClusterGatewayResourceOut + (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp } var file_infra_proto_depIdxs = []int32{ - 11, // 0: MarkClusterOnlineAtIn.timestamp:type_name -> google.protobuf.Timestamp + 13, // 0: MarkClusterOnlineAtIn.timestamp:type_name -> google.protobuf.Timestamp 0, // 1: Infra.GetCluster:input_type -> GetClusterIn 0, // 2: Infra.GetByokCluster:input_type -> GetClusterIn 2, // 3: Infra.GetNodepool:input_type -> GetNodepoolIn @@ -901,15 +1020,17 @@ var file_infra_proto_depIdxs = []int32{ 0, // 5: Infra.GetClusterKubeconfig:input_type -> GetClusterIn 7, // 6: Infra.MarkClusterOnlineAt:input_type -> MarkClusterOnlineAtIn 9, // 7: Infra.EnsureGlobalVPNConnection:input_type -> EnsureGlobalVPNConnectionIn - 1, // 8: Infra.GetCluster:output_type -> GetClusterOut - 1, // 9: Infra.GetByokCluster:output_type -> GetClusterOut - 3, // 10: Infra.GetNodepool:output_type -> GetNodepoolOut - 5, // 11: Infra.ClusterExists:output_type -> ClusterExistsOut - 6, // 12: Infra.GetClusterKubeconfig:output_type -> GetClusterKubeconfigOut - 8, // 13: Infra.MarkClusterOnlineAt:output_type -> MarkClusterOnlineAtOut - 10, // 14: Infra.EnsureGlobalVPNConnection:output_type -> EnsureGlobalVPNConnectionOut - 8, // [8:15] is the sub-list for method output_type - 1, // [1:8] is the sub-list for method input_type + 11, // 8: Infra.GetClusterGatewayResource:input_type -> GetClusterGatewayResourceIn + 1, // 9: Infra.GetCluster:output_type -> GetClusterOut + 1, // 10: Infra.GetByokCluster:output_type -> GetClusterOut + 3, // 11: Infra.GetNodepool:output_type -> GetNodepoolOut + 5, // 12: Infra.ClusterExists:output_type -> ClusterExistsOut + 6, // 13: Infra.GetClusterKubeconfig:output_type -> GetClusterKubeconfigOut + 8, // 14: Infra.MarkClusterOnlineAt:output_type -> MarkClusterOnlineAtOut + 10, // 15: Infra.EnsureGlobalVPNConnection:output_type -> EnsureGlobalVPNConnectionOut + 12, // 16: Infra.GetClusterGatewayResource:output_type -> GetClusterGatewayResourceOut + 9, // [9:17] is the sub-list for method output_type + 1, // [1:9] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -1053,6 +1174,30 @@ func file_infra_proto_init() { return nil } } + file_infra_proto_msgTypes[11].Exporter = func(v any, i int) any { + switch v := v.(*GetClusterGatewayResourceIn); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_infra_proto_msgTypes[12].Exporter = func(v any, i int) any { + switch v := v.(*GetClusterGatewayResourceOut); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -1060,7 +1205,7 @@ func file_infra_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_infra_proto_rawDesc, NumEnums: 0, - NumMessages: 11, + NumMessages: 13, NumExtensions: 0, NumServices: 1, }, diff --git a/apps/infra/protobufs/infra/infra_grpc.pb.go b/apps/infra/protobufs/infra/infra_grpc.pb.go index ab0452885..195b1ac26 100644 --- a/apps/infra/protobufs/infra/infra_grpc.pb.go +++ b/apps/infra/protobufs/infra/infra_grpc.pb.go @@ -26,6 +26,7 @@ const ( Infra_GetClusterKubeconfig_FullMethodName = "/Infra/GetClusterKubeconfig" Infra_MarkClusterOnlineAt_FullMethodName = "/Infra/MarkClusterOnlineAt" Infra_EnsureGlobalVPNConnection_FullMethodName = "/Infra/EnsureGlobalVPNConnection" + Infra_GetClusterGatewayResource_FullMethodName = "/Infra/GetClusterGatewayResource" ) // InfraClient is the client API for Infra service. @@ -39,6 +40,7 @@ type InfraClient interface { GetClusterKubeconfig(ctx context.Context, in *GetClusterIn, opts ...grpc.CallOption) (*GetClusterKubeconfigOut, error) MarkClusterOnlineAt(ctx context.Context, in *MarkClusterOnlineAtIn, opts ...grpc.CallOption) (*MarkClusterOnlineAtOut, error) EnsureGlobalVPNConnection(ctx context.Context, in *EnsureGlobalVPNConnectionIn, opts ...grpc.CallOption) (*EnsureGlobalVPNConnectionOut, error) + GetClusterGatewayResource(ctx context.Context, in *GetClusterGatewayResourceIn, opts ...grpc.CallOption) (*GetClusterGatewayResourceOut, error) } type infraClient struct { @@ -112,6 +114,15 @@ func (c *infraClient) EnsureGlobalVPNConnection(ctx context.Context, in *EnsureG return out, nil } +func (c *infraClient) GetClusterGatewayResource(ctx context.Context, in *GetClusterGatewayResourceIn, opts ...grpc.CallOption) (*GetClusterGatewayResourceOut, error) { + out := new(GetClusterGatewayResourceOut) + err := c.cc.Invoke(ctx, Infra_GetClusterGatewayResource_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // InfraServer is the server API for Infra service. // All implementations must embed UnimplementedInfraServer // for forward compatibility @@ -123,6 +134,7 @@ type InfraServer interface { GetClusterKubeconfig(context.Context, *GetClusterIn) (*GetClusterKubeconfigOut, error) MarkClusterOnlineAt(context.Context, *MarkClusterOnlineAtIn) (*MarkClusterOnlineAtOut, error) EnsureGlobalVPNConnection(context.Context, *EnsureGlobalVPNConnectionIn) (*EnsureGlobalVPNConnectionOut, error) + GetClusterGatewayResource(context.Context, *GetClusterGatewayResourceIn) (*GetClusterGatewayResourceOut, error) mustEmbedUnimplementedInfraServer() } @@ -151,6 +163,9 @@ func (UnimplementedInfraServer) MarkClusterOnlineAt(context.Context, *MarkCluste func (UnimplementedInfraServer) EnsureGlobalVPNConnection(context.Context, *EnsureGlobalVPNConnectionIn) (*EnsureGlobalVPNConnectionOut, error) { return nil, status.Errorf(codes.Unimplemented, "method EnsureGlobalVPNConnection not implemented") } +func (UnimplementedInfraServer) GetClusterGatewayResource(context.Context, *GetClusterGatewayResourceIn) (*GetClusterGatewayResourceOut, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetClusterGatewayResource not implemented") +} func (UnimplementedInfraServer) mustEmbedUnimplementedInfraServer() {} // UnsafeInfraServer may be embedded to opt out of forward compatibility for this service. @@ -290,6 +305,24 @@ func _Infra_EnsureGlobalVPNConnection_Handler(srv interface{}, ctx context.Conte return interceptor(ctx, in, info, handler) } +func _Infra_GetClusterGatewayResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetClusterGatewayResourceIn) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InfraServer).GetClusterGatewayResource(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Infra_GetClusterGatewayResource_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InfraServer).GetClusterGatewayResource(ctx, req.(*GetClusterGatewayResourceIn)) + } + return interceptor(ctx, in, info, handler) +} + // Infra_ServiceDesc is the grpc.ServiceDesc for Infra service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -325,6 +358,10 @@ var Infra_ServiceDesc = grpc.ServiceDesc{ MethodName: "EnsureGlobalVPNConnection", Handler: _Infra_EnsureGlobalVPNConnection_Handler, }, + { + MethodName: "GetClusterGatewayResource", + Handler: _Infra_GetClusterGatewayResource_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "infra.proto", diff --git a/apps/message-office/internal/app/app.go b/apps/message-office/internal/app/app.go index 553e910a5..50d97631b 100644 --- a/apps/message-office/internal/app/app.go +++ b/apps/message-office/internal/app/app.go @@ -4,13 +4,13 @@ import ( "context" "log/slog" + "github.com/kloudlite/api/apps/infra/protobufs/infra" "github.com/kloudlite/api/apps/message-office/internal/app/graph" "github.com/kloudlite/api/apps/message-office/internal/app/graph/generated" proto_rpc "github.com/kloudlite/api/apps/message-office/internal/app/proto-rpc" "github.com/kloudlite/api/apps/message-office/internal/domain" "github.com/kloudlite/api/apps/message-office/internal/entities" "github.com/kloudlite/api/apps/message-office/internal/env" - "github.com/kloudlite/api/grpc-interfaces/infra" "github.com/kloudlite/api/pkg/grpc" httpServer "github.com/kloudlite/api/pkg/http-server" "github.com/kloudlite/api/pkg/logging" diff --git a/apps/message-office/internal/app/grpc-server.go b/apps/message-office/internal/app/grpc-server.go index 6975ebd09..d83a87999 100644 --- a/apps/message-office/internal/app/grpc-server.go +++ b/apps/message-office/internal/app/grpc-server.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/kloudlite/api/grpc-interfaces/infra" + "github.com/kloudlite/api/apps/infra/protobufs/infra" klErrors "github.com/kloudlite/api/pkg/errors" "github.com/kloudlite/api/pkg/grpc" @@ -50,6 +50,29 @@ type ( } ) +// SendClusterGatewayResource implements messages.MessageDispatchServiceServer. +func (g *grpcServer) SendClusterGatewayResource(ctx context.Context, _ *messages.Empty) (*messages.GatewayResource, error) { + accountName, clusterName, err := g.validateAndDecodeFromGrpcContext(ctx, g.ev.TokenHashingSecret) + if err != nil { + return nil, err + } + + out, err := g.infraClient.GetClusterGatewayResource(ctx, &infra.GetClusterGatewayResourceIn{ + AccountName: accountName, + ClusterName: clusterName, + }) + if err != nil { + return nil, err + } + + return &messages.GatewayResource{Gateway: out.Gateway}, nil +} + +// ReceiveIotConsoleResourceUpdate implements messages.MessageDispatchServiceServer. +func (g *grpcServer) ReceiveIotConsoleResourceUpdate(context.Context, *messages.ResourceUpdate) (*messages.Empty, error) { + panic("unimplemented") +} + // ReceiveConsoleResourceUpdate implements messages.MessageDispatchServiceServer. func (g *grpcServer) ReceiveConsoleResourceUpdate(ctx context.Context, msg *messages.ResourceUpdate) (_ *messages.Empty, err error) { accountName, clusterName, err := g.validateAndDecodeFromGrpcContext(ctx, g.ev.TokenHashingSecret) @@ -444,7 +467,6 @@ func dispatchResourceUpdate(ctx context.Context, receiver common.MessageReceiver func NewMessageOfficeServer(producer UpdatesProducer, jc *nats.JetstreamClient, ev *env.Env, d domain.Domain, logger *slog.Logger, infraCli infra.InfraClient) (messages.MessageDispatchServiceServer, error) { return &grpcServer{ - UnimplementedMessageDispatchServiceServer: messages.UnimplementedMessageDispatchServiceServer{}, infraClient: infraCli, logger: logger, updatesProducer: producer, diff --git a/apps/tenant-agent/main.go b/apps/tenant-agent/main.go index b080df346..2e64267d2 100644 --- a/apps/tenant-agent/main.go +++ b/apps/tenant-agent/main.go @@ -106,20 +106,12 @@ func (g *grpcHandler) handleMessage(_ context.Context, msg t.AgentMessage) error switch msg.Action { case t.ActionApply: { - // lb := obj.GetLabels() - // if lb == nil { - // lb = make(map[string]string, 1) - // } - // lb[constants.AccountNameKey] = msg.AccountName - // obj.SetLabels(lb) - b, err := yaml.Marshal(msg.Object) if err != nil { return g.handleErrorOnApply(ctx, err, msg) } if _, err := g.yamlClient.ApplyYAML(ctx, b); err != nil { - // mLogger.Errorf(err, "[%d] [error-on-apply]: yaml: \n%s\n", g.inMemCounter, b) mLogger.Error("failed to process message, got", "err", err, "error-on-apply:YAML", fmt.Sprintf("\n%s\n", b)) return g.handleErrorOnApply(ctx, err, msg) } @@ -132,6 +124,7 @@ func (g *grpcHandler) handleMessage(_ context.Context, msg t.AgentMessage) error mLogger.Info("processed message, resource does not exist, might already be deleted") return g.handleErrorOnApply(ctx, err, msg) } + mLogger.Error("failed to process message, got", "err", err) return g.handleErrorOnApply(ctx, err, msg) } @@ -276,6 +269,22 @@ func (g *grpcHandler) run(rctx context.Context) error { } } +func (g *grpcHandler) askForGatewayResource(rctx context.Context) error { + ctx := NewAuthorizedGrpcContext(rctx, g.ev.AccessToken) + + g.logger.Info("asking message office to send gateway resource for this cluster") + out, err := g.msgDispatchCli.SendClusterGatewayResource(ctx, &messages.Empty{}) + if err != nil { + return errors.NewE(err) + } + + if _, err := g.yamlClient.ApplyYAML(ctx, out.Gateway); err != nil { + g.logger.Error("failed to process message, got", "err", err, "error-on-apply:YAML", fmt.Sprintf("\n%s\n", out.Gateway)) + } + + return nil +} + func main() { var isDev bool flag.BoolVar(&isDev, "dev", false, "--dev") @@ -283,6 +292,9 @@ func main() { var debug bool flag.BoolVar(&debug, "debug", false, "--debug") + var kubeApiAddr string + flag.StringVar(&kubeApiAddr, "kube-api-addr", "localhost:8081", "--kube-api-addr [host]:port") + flag.Parse() start := time.Now() @@ -296,14 +308,14 @@ func main() { yamlClient := func() kubectl.YAMLClient { if isDev { - logger.Debug("connecting to k8s over", "local-addr", "localhost:8081") - return kubectl.NewYAMLClientOrDie(&rest.Config{Host: "localhost:8081"}, kubectl.YAMLClientOpts{}) + logger.Debug("connecting to k8s over", "local-addr", kubeApiAddr) + return kubectl.NewYAMLClientOrDie(&rest.Config{Host: kubeApiAddr}, kubectl.YAMLClientOpts{Slogger: logger}) } config, err := rest.InClusterConfig() if err != nil { panic(err) } - return kubectl.NewYAMLClientOrDie(config, kubectl.YAMLClientOpts{}) + return kubectl.NewYAMLClientOrDie(config, kubectl.YAMLClientOpts{Slogger: logger}) }() g := grpcHandler{ @@ -340,14 +352,18 @@ func main() { for { cc, err := libGrpc.NewGrpcClientV2(ev.GrpcAddr, libGrpc.GrpcConnectOpts{TLSConnect: !isDev, Logger: logger}) if err != nil { - logger.Error("failed to connect to message office, got", "err", err) - <-time.After(1 * time.Second) + logger.Error("failed to connect to message office, got", "err", err, "retrying after", "5s") + <-time.After(5 * time.Second) } g.msgDispatchCli = messages.NewMessageDispatchServiceClient(cc) if err := g.ensureAccessToken(); err != nil { logger.Error("ensuring access token, got", "err", err) + logger.Info("will retry after 5s") + cc.Close() + <-time.After(5 * time.Second) + continue } ctx, cf := context.WithTimeout(context.TODO(), MaxConnectionDuration) @@ -356,6 +372,14 @@ func main() { vps.realVectorClient = proto_rpc.NewVectorClient(cc) vps.connCancelFn = cf + if err := g.askForGatewayResource(ctx); err != nil { + logger.Error("asking gateway resource, got", "err", err) + cf() + cleanup(ctx, cc, logger) + logger.Info("will retry after 5s") + <-time.After(5 * time.Second) + } + go func() { defer cf() if err := g.run(ctx); err != nil { @@ -363,11 +387,15 @@ func main() { } }() - <-ctx.Done() - logger.Debug("MAX_CONNECTION_DURATION reached, will re-initialize connection") + cleanup(ctx, cc, logger) + } +} - if err = cc.Close(); err != nil { - logger.Error("Failed to close connection, got", "err", err) - } +func cleanup(ctx context.Context, cc libGrpc.Client, logger *slog.Logger) { + <-ctx.Done() + logger.Debug("MAX_CONNECTION_DURATION reached, will re-initialize connection") + + if err := cc.Close(); err != nil { + logger.Error("Failed to close connection, got", "err", err) } } diff --git a/pkg/functions/k8s-utils.go b/pkg/functions/k8s-utils.go index f4357c86f..dda7ed71a 100644 --- a/pkg/functions/k8s-utils.go +++ b/pkg/functions/k8s-utils.go @@ -2,9 +2,10 @@ package functions import ( "encoding/json" - "github.com/kloudlite/api/pkg/errors" "regexp" + "github.com/kloudlite/api/pkg/errors" + "github.com/kloudlite/api/constants" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" From d87efdd5ada422e1907f6b3ddfdd8dabfcfc10b0 Mon Sep 17 00:00:00 2001 From: nxtcoder17 Date: Thu, 24 Oct 2024 23:36:55 +0530 Subject: [PATCH 2/2] chore: update kloudlite/operator to v1.1.1 --- apps/tenant-agent/main.go | 1 + go.mod | 2 +- go.sum | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/tenant-agent/main.go b/apps/tenant-agent/main.go index 2e64267d2..805983f5e 100644 --- a/apps/tenant-agent/main.go +++ b/apps/tenant-agent/main.go @@ -280,6 +280,7 @@ func (g *grpcHandler) askForGatewayResource(rctx context.Context) error { if _, err := g.yamlClient.ApplyYAML(ctx, out.Gateway); err != nil { g.logger.Error("failed to process message, got", "err", err, "error-on-apply:YAML", fmt.Sprintf("\n%s\n", out.Gateway)) + return err } return nil diff --git a/go.mod b/go.mod index 76cc082f3..7d9ecf94b 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/go-chi/chi/v5 v5.0.10 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 github.com/kloudlite/container-registry-authorizer v0.0.0-20231021122509-161dc30fde55 - github.com/kloudlite/operator v0.0.0-20241009112404-194113cddc17 + github.com/kloudlite/operator v1.0.1-0.20241024180107-6c3c50b884a6 github.com/miekg/dns v1.1.57 github.com/mittwald/go-helm-client v0.12.14 github.com/nats-io/nats.go v1.31.0 diff --git a/go.sum b/go.sum index 3f4a68fc8..e07201b06 100644 --- a/go.sum +++ b/go.sum @@ -332,6 +332,8 @@ github.com/kloudlite/container-registry-authorizer v0.0.0-20231021122509-161dc30 github.com/kloudlite/container-registry-authorizer v0.0.0-20231021122509-161dc30fde55/go.mod h1:GZj3wZmIw/qCciclRhgQTgmGiqe8wxoVzMXQjbOfnbc= github.com/kloudlite/operator v0.0.0-20241009112404-194113cddc17 h1:BeVmWJWfAy1dkClLTLa07cCmB+LPF75mYYaGs3tz4Qg= github.com/kloudlite/operator v0.0.0-20241009112404-194113cddc17/go.mod h1:VkreINDW43qeTsDv9gfGH5M9c5OG/jPGYOGxj8otsGY= +github.com/kloudlite/operator v1.0.1-0.20241024180107-6c3c50b884a6 h1:fTiXbFpdyCihZ74Cs1Of2JDKRO4r6MeIU33yPWY6nW4= +github.com/kloudlite/operator v1.0.1-0.20241024180107-6c3c50b884a6/go.mod h1:VkreINDW43qeTsDv9gfGH5M9c5OG/jPGYOGxj8otsGY= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=