Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/workload/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ func Command() *cli.Command {
},
},
},
{
Name: "sendlarge",
Usage: "send single large file to workload(s)",
ArgsUsage: sendArgsUsage,
Action: utils.ExitCoder(cmdWorkloadSendLarge),
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "file",
Usage: "copy local file to workload, only can use single time. src_path:dst_path[:mode[:uid:gid]]",
},
},
},
{
Name: "dissociate",
Usage: "dissociate workload(s) from eru, return it resource but not remove it",
Expand Down
123 changes: 123 additions & 0 deletions cmd/workload/sendlarge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package workload

import (
"context"
"fmt"
"io"
"sync"

"github.com/projecteru2/cli/cmd/utils"
corepb "github.com/projecteru2/core/rpc/gen"
"github.com/projecteru2/core/types"

"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

type sendLargeWorkloadsOptions struct {
client corepb.CoreRPCClient
// workload ids
ids []string
dst string
content []byte
modes *corepb.FileMode
owners *corepb.FileOwner
}

func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error {
stream, err := o.client.SendLargeFile(ctx)
if err != nil {
logrus.Errorf("[SendLarge] Failed send %s", o.dst)
return err
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}

if msg.Error != "" {
logrus.Errorf("[SendLarge] Failed send %s to %s", msg.Path, msg.Id)
} else {
logrus.Infof("[SendLarge] Send %s to %s success", msg.Path, msg.Id)
}
}
}()

fileOptions := o.toSendLargeFileChunks()
for _, chunk := range fileOptions {
err := stream.Send(chunk)
if err != nil {
logrus.Errorf("[SendLarge] Failed send %s", chunk.Dst)
return err
}
}
stream.CloseSend()
wg.Wait()
return nil
}

func (o *sendLargeWorkloadsOptions) toSendLargeFileChunks() []*corepb.FileOptions {
maxChunkSize := types.SendLargeFileChunkSize
ret := make([]*corepb.FileOptions, 0)
for idx := 0; idx < len(o.content); idx += maxChunkSize {
fileOption := &corepb.FileOptions{
Ids: o.ids,
Dst: o.dst,
Size: int64(len(o.content)),
Mode: o.modes,
Owner: o.owners,
}
if idx+maxChunkSize > len(o.content) {
fileOption.Chunk = o.content[idx:]
} else {
fileOption.Chunk = o.content[idx : idx+maxChunkSize]
}
ret = append(ret, fileOption)
}
return ret
}

func cmdWorkloadSendLarge(c *cli.Context) error {
client, err := utils.NewCoreRPCClient(c)
if err != nil {
return err
}

content, modes, owners := utils.GenerateFileOptions(c)
if len(content) == 0 {
return fmt.Errorf("files should not be empty")
}
if len(content) >= 2 {
return fmt.Errorf("can not send multiple files at the same time")
}

ids := c.Args().Slice()
if len(ids) == 0 {
return fmt.Errorf("Workload ID(s) should not be empty")
}

targetFileName := func() string {
for key := range content {
return key
}
return ""
}()
o := &sendLargeWorkloadsOptions{
client: client,
ids: ids,
dst: targetFileName,
content: content[targetFileName],
modes: modes[targetFileName],
owners: owners[targetFileName],
}
return o.run(c.Context)
}