Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions sdk/task/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
return fmt.Errorf("failed to create client factory: %w", err)
}

req := &supernodeservice.CascadeSupernodeRegisterRequest{
FilePath: t.filePath,
ActionID: t.ActionID,
TaskId: t.TaskID,
}

ordered := supernodes

// Ensure any unused preClients are closed when we return
Expand Down Expand Up @@ -125,7 +119,12 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
}
}

if err := t.attemptRegistration(ctx, iteration-1, sn, clientFactory, req, pre); err != nil {
req := &supernodeservice.CascadeSupernodeRegisterRequest{
FilePath: t.filePath,
ActionID: t.ActionID,
TaskId: t.TaskID,
}
if err := t.attemptRegistration(ctx, iteration, sn, clientFactory, req, pre); err != nil {
t.LogEvent(ctx, event.SDKRegistrationFailure, "registration with supernode failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
Expand All @@ -152,7 +151,7 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
return fmt.Errorf("failed to upload to all supernodes")
}

func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error {
func (t *CascadeTask) attemptRegistration(ctx context.Context, iteration int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest, preClient net.SupernodeClient) error {
var client net.SupernodeClient
var err error
if preClient != nil {
Expand All @@ -169,9 +168,14 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.
t.LogEvent(ctx, event.SDKConnectionEstablished, "Connection to supernode established", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
event.KeySupernodeAddress: sn.CosmosAddress,
event.KeyIteration: iteration,
})

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
if data == nil {
data = make(event.EventData)
}
data[event.KeyIteration] = iteration
t.LogEvent(ctx, evt, msg, data)
}
// Use ctx directly; per-phase timers are applied inside the adapter
Expand Down Expand Up @@ -200,6 +204,7 @@ func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.
t.LogEvent(ctx, event.SDKTaskTxHashReceived, "txhash received", event.EventData{
event.KeyTxHash: resp.TxHash,
event.KeySupernode: sn.CosmosAddress,
event.KeyIteration: iteration,
})

return nil
Expand Down
26 changes: 15 additions & 11 deletions sdk/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,10 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
}
}()

req := &supernodeservice.CascadeSupernodeDownloadRequest{
ActionID: t.actionId,
TaskID: t.TaskID,
OutputPath: t.outputPath,
Signature: t.signature,
}

// Remove existing file once before starting attempts to allow overwrite
if _, err := os.Stat(req.OutputPath); err == nil {
if removeErr := os.Remove(req.OutputPath); removeErr != nil {
return fmt.Errorf("failed to remove existing file %s: %w", req.OutputPath, removeErr)
if _, err := os.Stat(t.outputPath); err == nil {
if removeErr := os.Remove(t.outputPath); removeErr != nil {
return fmt.Errorf("failed to remove existing file %s: %w", t.outputPath, removeErr)
}
}

Expand Down Expand Up @@ -124,7 +117,13 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern
delete(preClients, sn.CosmosAddress)
}
}
if err := t.attemptDownload(ctx, sn, clientFactory, req, pre); err != nil {
req := &supernodeservice.CascadeSupernodeDownloadRequest{
ActionID: t.actionId,
TaskID: t.TaskID,
OutputPath: t.outputPath,
Signature: t.signature,
}
if err := t.attemptDownload(ctx, iteration, sn, clientFactory, req, pre); err != nil {
// Log failure and continue with the rest
t.LogEvent(ctx, event.SDKDownloadFailure, "download from super-node failed", event.EventData{
event.KeySupernode: sn.GrpcEndpoint,
Expand All @@ -148,6 +147,7 @@ func (t *CascadeDownloadTask) downloadFromSupernodes(ctx context.Context, supern

func (t *CascadeDownloadTask) attemptDownload(
parent context.Context,
iteration int,
sn lumera.Supernode,
factory *net.ClientFactory,
req *supernodeservice.CascadeSupernodeDownloadRequest,
Expand All @@ -169,6 +169,10 @@ func (t *CascadeDownloadTask) attemptDownload(
defer client.Close(ctx)

req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
if data == nil {
data = make(event.EventData)
}
data[event.KeyIteration] = iteration
t.LogEvent(ctx, evt, msg, data)
}

Expand Down