Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/transport/connect_compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func TestConnectRPCCompressionIntegration(t *testing.T) {
// No compression options
)
dakrClient = &RealDakrClient{
logger: logr.Discard().WithName("test-dakr-client"),
client: client,
clusterToken: "test-token",
logger: logr.Discard().WithName("test-dakr-client"),
client: client,
clientHeaders: NewClientHeaders("test-token"),
}
}

Expand Down
124 changes: 109 additions & 15 deletions internal/transport/dakr_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"math"
"net"
"net/http"
"sync"
"time"

"connectrpc.com/connect"
"github.com/devzero-inc/zxporter/internal/collector"
"github.com/devzero-inc/zxporter/internal/version"
"github.com/go-logr/logr"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
Expand All @@ -31,6 +33,14 @@ const (
maxSendBatchSize = 32 * 1024 * 1024 // 32MB uncompressed
// Maximum read batch size for grpc client (uncompressed size limit)
maxReadBatchSize = 32 * 1024 * 1024 // 32MB uncompressed

DefaultOperatorType = "zxporter"

// Header keys for client identification in transport layer
HeaderClient = "X-Client"
HeaderOperatorType = "X-Operator-Type"
HeaderOperatorVersion = "X-Operator-Version"
HeaderOperatorGitSHA = "X-Operator-Git-SHA"
)

// RetryPolicy defines the parameters for retrying.
Expand All @@ -41,16 +51,83 @@ type RetryPolicy struct {
IsRetryable func(err error) bool
}

// ClientHeaders holds the operator identification information for transport headers.
// This struct is used by the interceptor to access current client information.
type ClientHeaders struct {
mu sync.RWMutex
clusterToken string
version string
gitCommit string
operatorType string
}

// NewClientHeaders creates a new ClientHeaders instance
func NewClientHeaders(clusterToken string) *ClientHeaders {
versionInfo := version.Get()
return &ClientHeaders{
clusterToken: clusterToken,
version: versionInfo.String(),
gitCommit: versionInfo.GitCommit,
operatorType: DefaultOperatorType,
}
}

// SetClusterToken updates the cluster token (thread-safe)
func (h *ClientHeaders) SetClusterToken(token string) {
h.mu.Lock()
defer h.mu.Unlock()
h.clusterToken = token
}

// GetClusterToken returns the current cluster token (thread-safe)
func (h *ClientHeaders) GetClusterToken() string {
h.mu.RLock()
defer h.mu.RUnlock()
return h.clusterToken
}

// AttachToRequest adds all client identification headers to a Connect request.
// This is the core method that sets:
// - Authorization: Bearer token for authentication
// - X-Client: Combined operator type and version (e.g., "zxporter/1.0.0")
// - X-Operator-Type: The operator type identifier
// - X-Operator-Version: The operator version
// - X-Operator-Git-SHA: The git commit SHA of the operator build
func (h *ClientHeaders) AttachToRequest(header http.Header) {
h.mu.RLock()
defer h.mu.RUnlock()

if h.clusterToken != "" {
header.Set("Authorization", fmt.Sprintf("Bearer %s", h.clusterToken))
}

clientHeader := h.operatorType
if h.version != "" {
clientHeader = fmt.Sprintf("%s/%s", h.operatorType, h.version)
}
header.Set(HeaderClient, clientHeader)

header.Set(HeaderOperatorType, h.operatorType)
if h.version != "" {
header.Set(HeaderOperatorVersion, h.version)
}
if h.gitCommit != "" {
header.Set(HeaderOperatorGitSHA, h.gitCommit)
}
}

// RealDakrClient implements communication with Dakr service
type RealDakrClient struct {
logger logr.Logger
client genconnect.MetricsCollectorServiceClient
clusterClient genconnect.ClusterServiceClient
clusterToken string
clientHeaders *ClientHeaders
}

// NewDakrClient creates a new client for Dakr service
func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger) DakrClient {
clientHeaders := NewClientHeaders(clusterToken)

// Define the retry policy
retryPolicy := RetryPolicy{
MaxAttempts: 3,
Expand Down Expand Up @@ -118,6 +195,8 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger)
})
})

clientHeadersInterceptor := newClientHeadersInterceptor(clientHeaders)

// Create custom HTTP client with improved timeout and connection pool settings
httpClient := &http.Client{
Timeout: 120 * time.Second,
Expand All @@ -140,7 +219,7 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger)
connect.WithGRPC(),
connect.WithSendGzip(), // Enable gzip compression for requests
connect.WithCompressMinBytes(1024), // Only compress if payload > 1KB
connect.WithInterceptors(retryInterceptor),
connect.WithInterceptors(clientHeadersInterceptor, retryInterceptor),
connect.WithSendMaxBytes(maxSendBatchSize),
connect.WithReadMaxBytes(maxReadBatchSize),
}
Expand All @@ -163,10 +242,19 @@ func NewDakrClient(dakrBaseURL string, clusterToken string, logger logr.Logger)
logger: logger.WithName("dakr-client"),
client: client,
clusterClient: clusterClient,
clusterToken: clusterToken,
clientHeaders: clientHeaders,
}
}

func newClientHeadersInterceptor(headers *ClientHeaders) connect.UnaryInterceptorFunc {
return connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
headers.AttachToRequest(req.Header())
return next(ctx, req)
})
})
}

// toStructpb converts an arbitrary object to structpb.Struct
func (c *RealDakrClient) toStructpb(data interface{}) (*structpb.Struct, error) {
// First convert to JSON
Expand Down Expand Up @@ -204,7 +292,6 @@ func (c *RealDakrClient) SendResource(ctx context.Context, resource collector.Co
}

req := connect.NewRequest(res)
attachClusterToken(req, c.clusterToken)

if ctx.Value("cluster_id") != nil {
clusterString, _ := ctx.Value("cluster_id").(string)
Expand Down Expand Up @@ -347,7 +434,6 @@ func (c *RealDakrClient) SendResourceBatch(ctx context.Context, resources []coll
}

req := connect.NewRequest(batchReq)
attachClusterToken(req, c.clusterToken)

if ctx.Value("cluster_id") != nil {
clusterString, _ := ctx.Value("cluster_id").(string)
Expand Down Expand Up @@ -399,7 +485,6 @@ func (c *RealDakrClient) SendTelemetryMetrics(ctx context.Context, metrics []*dt
}

req := connect.NewRequest(telemetryReq)
attachClusterToken(req, c.clusterToken)

// Send to Dakr
resp, err := c.client.SendTelemetryMetrics(ctx, req)
Expand All @@ -411,10 +496,6 @@ func (c *RealDakrClient) SendTelemetryMetrics(ctx context.Context, metrics []*dt
return resp.Msg.ProcessedCount, nil
}

func attachClusterToken[T any](req *connect.Request[T], clusterToken string) {
req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", clusterToken))
}

// SendClusterSnapshotStream sends cluster snapshot data in chunks via streaming
func (c *RealDakrClient) SendClusterSnapshotStream(ctx context.Context, snapshot *gen.ClusterSnapshot, snapshotID string, timestamp time.Time) (string, *gen.ClusterSnapshot, error) {
c.logger.Info("Sending cluster snapshot", "snapshotId", snapshotID)
Expand All @@ -430,7 +511,8 @@ func (c *RealDakrClient) SendClusterSnapshotStream(ctx context.Context, snapshot

// Establish the stream
stream := c.client.SendClusterSnapshotStream(ctx)
stream.RequestHeader().Set("Authorization", fmt.Sprintf("Bearer %s", c.clusterToken))

c.clientHeaders.AttachToRequest(stream.RequestHeader())

clusterID := ""
teamID := ""
Expand Down Expand Up @@ -490,7 +572,6 @@ func (c *RealDakrClient) SendTelemetryLogs(ctx context.Context, in *gen.SendTele
c.logger.V(1).Info("Sending telemetry logs to Dakr", "count", len(in.Logs))

req := connect.NewRequest(in)
attachClusterToken(req, c.clusterToken)

// Set cluster and team IDs if they are not already in the request body
// (this is where we set the cluster id and team id, my assumption is that we always have those thing in our parent context)
Expand Down Expand Up @@ -523,10 +604,24 @@ func (c *RealDakrClient) ExchangePATForClusterToken(ctx context.Context, patToke
ClusterName: clusterName,
K8SProvider: k8sProvider,
})

// Add PAT token to the request header
// Add PAT token to the request header (overrides the default client headers auth)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Interceptor overwrites PAT token if clusterToken is non-empty

In ExchangePATForClusterToken, the PAT token is set on the request header at line 608 before the request is dispatched through the interceptor chain. When c.clusterClient.CreateClusterToken(ctx, req) is called, the clientHeadersInterceptor executes first and calls AttachToRequest(req.Header()), which sets Authorization: Bearer {clusterToken}overwriting the manually set PAT token.

Currently, the only call site in custom.go creates a temporary client with an empty cluster token (NewDakrClient(dakrURL, "", c.Log)), so the interceptor's if h.clusterToken != "" guard prevents the overwrite. However, this is a latent bug:

  1. The code's comment ("overrides the default client headers auth") implies the manual set happens after the interceptor, but it's the reverse — the interceptor runs when the RPC is dispatched.
  2. If this method is ever called on a client with a non-empty cluster token (e.g., after token rotation), authentication will silently break.
  3. The identification headers are also set twice — once manually and once by the interceptor — which is wasteful.

Suggested fix: Instead of manually setting headers, either:

  • Skip the interceptor for this specific call by using a separate client without the interceptor, OR
  • Set the PAT token via the interceptor by temporarily swapping the token, OR
  • Better yet, refactor AttachToRequest to accept an optional auth token override:
func (h *ClientHeaders) AttachToRequest(header http.Header, authOverride ...string) {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    token := h.clusterToken
    if len(authOverride) > 0 && authOverride[0] != "" {
        token = authOverride[0]
    }
    if token != "" {
        header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
    }
    // ... rest of identification headers
}

Then in ExchangePATForClusterToken:

c.clientHeaders.AttachToRequest(req.Header(), patToken)

This eliminates the duplicated logic and avoids the interceptor ordering issue.

Was this helpful? React with 👍 / 👎

req.Header().Set("Authorization", fmt.Sprintf("Bearer %s", patToken))

// Note: We use the clientHeaders but manually override the Authorization header above
versionInfo := version.Get()
clientHeader := DefaultOperatorType
if versionInfo.String() != "" {
clientHeader = fmt.Sprintf("%s/%s", DefaultOperatorType, versionInfo.String())
}
req.Header().Set(HeaderClient, clientHeader)
req.Header().Set(HeaderOperatorType, DefaultOperatorType)
if versionInfo.String() != "" {
req.Header().Set(HeaderOperatorVersion, versionInfo.String())
}
if versionInfo.GitCommit != "" {
req.Header().Set(HeaderOperatorGitSHA, versionInfo.GitCommit)
}

// Call the cluster service
resp, err := c.clusterClient.CreateClusterToken(ctx, req)
if err != nil {
Expand All @@ -541,7 +636,6 @@ func (c *RealDakrClient) ExchangePATForClusterToken(ctx context.Context, patToke
// SendNetworkTrafficMetrics pushes network traffic metrics from a node
func (c *RealDakrClient) SendNetworkTrafficMetrics(ctx context.Context, req *gen.SendNetworkTrafficMetricsRequest) (*gen.SendNetworkTrafficMetricsResponse, error) {
connectReq := connect.NewRequest(req)
attachClusterToken(connectReq, c.clusterToken)

// Pass through context values if needed (e.g. cluster_id)
if ctx.Value("cluster_id") != nil {
Expand Down