diff --git a/internal/transport/connect_compression_test.go b/internal/transport/connect_compression_test.go index a333caac..d87cc8b3 100644 --- a/internal/transport/connect_compression_test.go +++ b/internal/transport/connect_compression_test.go @@ -174,9 +174,9 @@ func TestConnectRPCCompressionIntegration(t *testing.T) { // No compression options ) dakrClient = &RealDakrClient{ - logger: logr.Discard().WithName("test-dakr-client"), - client: client, - clusterToken: "test-token", + logger: logr.Discard().WithName("test-dakr-client"), + client: client, + clientHeaders: NewClientHeaders("test-token"), } } diff --git a/internal/transport/dakr_client.go b/internal/transport/dakr_client.go index 36d53cc4..fd0e4c25 100644 --- a/internal/transport/dakr_client.go +++ b/internal/transport/dakr_client.go @@ -8,10 +8,12 @@ import ( "math" "net" "net/http" + "sync" "time" "connectrpc.com/connect" "github.com/devzero-inc/zxporter/internal/collector" + "github.com/devzero-inc/zxporter/internal/version" "github.com/go-logr/logr" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" @@ -31,6 +33,14 @@ const ( maxSendBatchSize = 32 * 1024 * 1024 // 32MB uncompressed // Maximum read batch size for grpc client (uncompressed size limit) maxReadBatchSize = 32 * 1024 * 1024 // 32MB uncompressed + + DefaultOperatorType = "zxporter" + + // Header keys for client identification in transport layer + HeaderClient = "X-Client" + HeaderOperatorType = "X-Operator-Type" + HeaderOperatorVersion = "X-Operator-Version" + HeaderOperatorGitSHA = "X-Operator-Git-SHA" ) // RetryPolicy defines the parameters for retrying. @@ -41,16 +51,83 @@ type RetryPolicy struct { IsRetryable func(err error) bool } +// ClientHeaders holds the operator identification information for transport headers. +// This struct is used by the interceptor to access current client information. +type ClientHeaders struct { + mu sync.RWMutex + clusterToken string + version string + gitCommit string + operatorType string +} + +// NewClientHeaders creates a new ClientHeaders instance +func NewClientHeaders(clusterToken string) *ClientHeaders { + versionInfo := version.Get() + return &ClientHeaders{ + clusterToken: clusterToken, + version: versionInfo.String(), + gitCommit: versionInfo.GitCommit, + operatorType: DefaultOperatorType, + } +} + +// SetClusterToken updates the cluster token (thread-safe) +func (h *ClientHeaders) SetClusterToken(token string) { + h.mu.Lock() + defer h.mu.Unlock() + h.clusterToken = token +} + +// GetClusterToken returns the current cluster token (thread-safe) +func (h *ClientHeaders) GetClusterToken() string { + h.mu.RLock() + defer h.mu.RUnlock() + return h.clusterToken +} + +// AttachToRequest adds all client identification headers to a Connect request. +// This is the core method that sets: +// - Authorization: Bearer token for authentication +// - X-Client: Combined operator type and version (e.g., "zxporter/1.0.0") +// - X-Operator-Type: The operator type identifier +// - X-Operator-Version: The operator version +// - X-Operator-Git-SHA: The git commit SHA of the operator build +func (h *ClientHeaders) AttachToRequest(header http.Header) { + h.mu.RLock() + defer h.mu.RUnlock() + + if h.clusterToken != "" { + header.Set("Authorization", fmt.Sprintf("Bearer %s", h.clusterToken)) + } + + clientHeader := h.operatorType + if h.version != "" { + clientHeader = fmt.Sprintf("%s/%s", h.operatorType, h.version) + } + header.Set(HeaderClient, clientHeader) + + header.Set(HeaderOperatorType, h.operatorType) + if h.version != "" { + header.Set(HeaderOperatorVersion, h.version) + } + if h.gitCommit != "" { + header.Set(HeaderOperatorGitSHA, h.gitCommit) + } +} + // RealDakrClient implements communication with Dakr service type RealDakrClient struct { logger logr.Logger client genconnect.MetricsCollectorServiceClient clusterClient genconnect.ClusterServiceClient - clusterToken string + clientHeaders *ClientHeaders } // NewDakrClient creates a new client for Dakr service func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) DakrClient { + clientHeaders := NewClientHeaders(clusterToken) + // Define the retry policy retryPolicy := RetryPolicy{ MaxAttempts: 3, @@ -118,6 +195,8 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) }) }) + clientHeadersInterceptor := newClientHeadersInterceptor(clientHeaders) + // Create custom HTTP client with improved timeout and connection pool settings httpClient := &http.Client{ Timeout: 120 * time.Second, @@ -140,7 +219,7 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) connect.WithGRPC(), connect.WithSendGzip(), // Enable gzip compression for requests connect.WithCompressMinBytes(1024), // Only compress if payload > 1KB - connect.WithInterceptors(retryInterceptor), + connect.WithInterceptors(clientHeadersInterceptor, retryInterceptor), connect.WithSendMaxBytes(maxSendBatchSize), connect.WithReadMaxBytes(maxReadBatchSize), } @@ -163,10 +242,19 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) logger: logger.WithName("dakr-client"), client: client, clusterClient: clusterClient, - clusterToken: clusterToken, + clientHeaders: clientHeaders, } } +func newClientHeadersInterceptor(headers *ClientHeaders) connect.UnaryInterceptorFunc { + return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { + return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + headers.AttachToRequest(req.Header()) + return next(ctx, req) + }) + }) +} + // toStructpb converts an arbitrary object to structpb.Struct func (c *RealDakrClient) toStructpb(data interface{}) (*structpb.Struct, error) { // First convert to JSON @@ -204,7 +292,6 @@ func (c *RealDakrClient) SendResource(ctx context.Context, resource collector.Co } req := connect.NewRequest(res) - attachClusterToken(req, c.clusterToken) if ctx.Value("cluster_id") != nil { clusterString, _ := ctx.Value("cluster_id").(string) @@ -347,7 +434,6 @@ func (c *RealDakrClient) SendResourceBatch(ctx context.Context, resources []coll } req := connect.NewRequest(batchReq) - attachClusterToken(req, c.clusterToken) if ctx.Value("cluster_id") != nil { clusterString, _ := ctx.Value("cluster_id").(string) @@ -399,7 +485,6 @@ func (c *RealDakrClient) SendTelemetryMetrics(ctx context.Context, metrics []*dt } req := connect.NewRequest(telemetryReq) - attachClusterToken(req, c.clusterToken) // Send to Dakr resp, err := c.client.SendTelemetryMetrics(ctx, req) @@ -411,10 +496,6 @@ func (c *RealDakrClient) SendTelemetryMetrics(ctx context.Context, metrics []*dt return resp.Msg.ProcessedCount, nil } -func attachClusterToken[T any](req *connect.Request[T], clusterToken string) { - req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", clusterToken)) -} - // SendClusterSnapshotStream sends cluster snapshot data in chunks via streaming func (c *RealDakrClient) SendClusterSnapshotStream(ctx context.Context, snapshot *gen.ClusterSnapshot, snapshotID string, timestamp time.Time) (string, *gen.ClusterSnapshot, error) { c.logger.Info("Sending cluster snapshot", "snapshotId", snapshotID) @@ -430,7 +511,8 @@ func (c *RealDakrClient) SendClusterSnapshotStream(ctx context.Context, snapshot // Establish the stream stream := c.client.SendClusterSnapshotStream(ctx) - stream.RequestHeader().Set("Authorization", fmt.Sprintf("Bearer %s", c.clusterToken)) + + c.clientHeaders.AttachToRequest(stream.RequestHeader()) clusterID := "" teamID := "" @@ -490,7 +572,6 @@ func (c *RealDakrClient) SendTelemetryLogs(ctx context.Context, in *gen.SendTele c.logger.V(1).Info("Sending telemetry logs to Dakr", "count", len(in.Logs)) req := connect.NewRequest(in) - attachClusterToken(req, c.clusterToken) // Set cluster and team IDs if they are not already in the request body // (this is where we set the cluster id and team id, my assumption is that we always have those thing in our parent context) @@ -523,10 +604,24 @@ func (c *RealDakrClient) ExchangePATForClusterToken(ctx context.Context, patToke ClusterName: clusterName, K8SProvider: k8sProvider, }) - - // Add PAT token to the request header + // Add PAT token to the request header (overrides the default client headers auth) req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", patToken)) + // Note: We use the clientHeaders but manually override the Authorization header above + versionInfo := version.Get() + clientHeader := DefaultOperatorType + if versionInfo.String() != "" { + clientHeader = fmt.Sprintf("%s/%s", DefaultOperatorType, versionInfo.String()) + } + req.Header().Set(HeaderClient, clientHeader) + req.Header().Set(HeaderOperatorType, DefaultOperatorType) + if versionInfo.String() != "" { + req.Header().Set(HeaderOperatorVersion, versionInfo.String()) + } + if versionInfo.GitCommit != "" { + req.Header().Set(HeaderOperatorGitSHA, versionInfo.GitCommit) + } + // Call the cluster service resp, err := c.clusterClient.CreateClusterToken(ctx, req) if err != nil { @@ -541,7 +636,6 @@ func (c *RealDakrClient) ExchangePATForClusterToken(ctx context.Context, patToke // SendNetworkTrafficMetrics pushes network traffic metrics from a node func (c *RealDakrClient) SendNetworkTrafficMetrics(ctx context.Context, req *gen.SendNetworkTrafficMetricsRequest) (*gen.SendNetworkTrafficMetricsResponse, error) { connectReq := connect.NewRequest(req) - attachClusterToken(connectReq, c.clusterToken) // Pass through context values if needed (e.g. cluster_id) if ctx.Value("cluster_id") != nil {