From 03f77d3e6d09569d2d219924dbb4cf71aea70f7a Mon Sep 17 00:00:00 2001 From: Matee Ullah Malik Date: Mon, 15 Dec 2025 16:13:40 +0500 Subject: [PATCH] sdk: include iteration in upload/download events --- sdk/task/cascade.go | 21 +++++++++++++-------- sdk/task/download.go | 26 +++++++++++++++----------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/sdk/task/cascade.go b/sdk/task/cascade.go index c467e339..af417c43 100644 --- a/sdk/task/cascade.go +++ b/sdk/task/cascade.go @@ -86,12 +86,6 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum return fmt.Errorf("failed to create client factory: %w", err) } - req := &supernodeservice.CascadeSupernodeRegisterRequest{ - FilePath: t.filePath, - ActionID: t.ActionID, - TaskId: t.TaskID, - } - ordered := supernodes // Ensure any unused preClients are closed when we return @@ -125,7 +119,12 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum } } - if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req, pre); err != nil { + req := &supernodeservice.CascadeSupernodeRegisterRequest{ + FilePath: t.filePath, + ActionID: t.ActionID, + TaskId: t.TaskID, + } + if err := t.attemptRegistration(ctx, iteration, sn, clientFactory, req, pre); err != nil { t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, event.KeySupernodeAddress: sn.CosmosAddress, @@ -152,7 +151,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum return fmt.Errorf("failed to upload to all supernodes") } -func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error { +func (t *CascadeTask) attemptRegistration(ctx context.Context, iteration int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error { var client net.SupernodeClient var err error if preClient != nil { @@ -169,9 +168,14 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera. t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, event.KeySupernodeAddress: sn.CosmosAddress, + event.KeyIteration: iteration, }) req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { + if data == nil { + data = make(event.EventData) + } + data[event.KeyIteration] = iteration t.LogEvent(ctx, evt, msg, data) } // Use ctx directly; per-phase timers are applied inside the adapter @@ -200,6 +204,7 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera. t.LogEvent(ctx, event.SDKTaskTxHashReceived, "txhash received", event.EventData{ event.KeyTxHash: resp.TxHash, event.KeySupernode: sn.CosmosAddress, + event.KeyIteration: iteration, }) return nil diff --git a/sdk/task/download.go b/sdk/task/download.go index bc8ca06f..98e0578e 100644 --- a/sdk/task/download.go +++ b/sdk/task/download.go @@ -85,17 +85,10 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern } }() - req := &supernodeservice.CascadeSupernodeDownloadRequest{ - ActionID: t.actionId, - TaskID: t.TaskID, - OutputPath: t.outputPath, - Signature: t.signature, - } - // Remove existing file once before starting attempts to allow overwrite - if _, err := os.Stat(req.OutputPath); err == nil { - if removeErr := os.Remove(req.OutputPath); removeErr != nil { - return fmt.Errorf("failed to remove existing file %s: %w", req.OutputPath, removeErr) + if _, err := os.Stat(t.outputPath); err == nil { + if removeErr := os.Remove(t.outputPath); removeErr != nil { + return fmt.Errorf("failed to remove existing file %s: %w", t.outputPath, removeErr) } } @@ -124,7 +117,13 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern delete(preClients, sn.CosmosAddress) } } - if err := t.attemptDownload(ctx, sn, clientFactory, req, pre); err != nil { + req := &supernodeservice.CascadeSupernodeDownloadRequest{ + ActionID: t.actionId, + TaskID: t.TaskID, + OutputPath: t.outputPath, + Signature: t.signature, + } + if err := t.attemptDownload(ctx, iteration, sn, clientFactory, req, pre); err != nil { // Log failure and continue with the rest t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{ event.KeySupernode: sn.GrpcEndpoint, @@ -148,6 +147,7 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern func (t *CascadeDownloadTask) attemptDownload( parent context.Context, + iteration int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeDownloadRequest, @@ -169,6 +169,10 @@ func (t *CascadeDownloadTask) attemptDownload( defer client.Close(ctx) req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) { + if data == nil { + data = make(event.EventData) + } + data[event.KeyIteration] = iteration t.LogEvent(ctx, evt, msg, data) }