From c3cca7e960d42fd6a080025caf077dd72e049606 Mon Sep 17 00:00:00 2001 From: hongji Date: Wed, 12 Jul 2023 23:37:26 +0800 Subject: [PATCH 1/4] finish sendlarge on cli --- cmd/workload/cmd.go | 12 ++++ cmd/workload/sendlarge.go | 116 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 cmd/workload/sendlarge.go diff --git a/cmd/workload/cmd.go b/cmd/workload/cmd.go index 481235f..1912806 100644 --- a/cmd/workload/cmd.go +++ b/cmd/workload/cmd.go @@ -216,6 +216,18 @@ func Command() *cli.Command { }, }, }, + { + Name: "sendlarge", + Usage: "send single large file to workload(s)", + ArgsUsage: sendArgsUsage, + Action: utils.ExitCoder(cmdWorkloadSendLarge), + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "file", + Usage: "copy local files to workload, only can use single time. src_path:dst_path[:mode[:uid:gid]]", + }, + }, + }, { Name: "dissociate", Usage: "dissociate workload(s) from eru, return it resource but not remove it", diff --git a/cmd/workload/sendlarge.go b/cmd/workload/sendlarge.go new file mode 100644 index 0000000..4099577 --- /dev/null +++ b/cmd/workload/sendlarge.go @@ -0,0 +1,116 @@ +package workload + +import ( + "context" + "fmt" + "io" + + "github.com/projecteru2/cli/cmd/utils" + corepb "github.com/projecteru2/core/rpc/gen" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" +) + +type sendLargeWorkloadsOptions struct { + client corepb.CoreRPCClient + // workload ids + ids []string + dst string + content []byte + modes *corepb.FileMode + owners *corepb.FileOwner +} + +func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error { + stream, err := o.client.SendLargeFile(ctx) + if err != nil { + logrus.Errorf("[SendLarge] Failed send %s", o.dst) + return err + } + + go func() { + for { + msg, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return + } + + if msg.Error != "" { + logrus.Errorf("[SendLarge] Failed send %s to %s", msg.Path, msg.Id) + } else { + logrus.Infof("[SendLarge] Send %s to %s success", msg.Path, msg.Id) + } + } + }() + + fileOptions := o.toSendLargeFileChunks() + for _, chunk := range fileOptions { + err := stream.Send(chunk) + if err != nil { + logrus.Errorf("[SendLarge] Failed send %s", chunk.Dst) + return err + } + } + return nil +} + +func (o *sendLargeWorkloadsOptions) toSendLargeFileChunks() []*corepb.FileOptions { + const maxChunkSize = 2 << 10 + ret := make([]*corepb.FileOptions, 0) + for idx := 0; idx < len(o.content); idx += maxChunkSize { + fileOption := &corepb.FileOptions{ + Ids: o.ids, + Dst: o.dst, + Size: int64(len(o.content)), + Mode: o.modes, + Owner: o.owners, + } + if idx+maxChunkSize > len(o.content) { + fileOption.Chunk = o.content[idx:] + } else { + fileOption.Chunk = o.content[idx : idx+maxChunkSize] + } + ret = append(ret, fileOption) + } + return ret +} + +func cmdWorkloadSendLarge(c *cli.Context) error { + client, err := utils.NewCoreRPCClient(c) + if err != nil { + return err + } + + content, modes, owners := utils.GenerateFileOptions(c) + if len(content) == 0 { + return fmt.Errorf("files should not be empty") + } + if len(content) >= 2 { + return fmt.Errorf("can not send multiple files at the same time") + } + + ids := c.Args().Slice() + if len(ids) == 0 { + return fmt.Errorf("Workload ID(s) should not be empty") + } + + targetFileName := func() string { + for key := range content { + return key + } + return "" + }() + o := &sendLargeWorkloadsOptions{ + client: client, + ids: ids, + dst: targetFileName, + content: content[targetFileName], + modes: modes[targetFileName], + owners: owners[targetFileName], + } + return o.run(c.Context) +} From 18aa8e9a6a1225faecd02fcc2dafa3fd335a7ea9 Mon Sep 17 00:00:00 2001 From: hongji Date: Wed, 12 Jul 2023 23:55:25 +0800 Subject: [PATCH 2/4] use chunksize from core --- cmd/workload/sendlarge.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/workload/sendlarge.go b/cmd/workload/sendlarge.go index 4099577..4327d09 100644 --- a/cmd/workload/sendlarge.go +++ b/cmd/workload/sendlarge.go @@ -7,6 +7,7 @@ import ( "github.com/projecteru2/cli/cmd/utils" corepb "github.com/projecteru2/core/rpc/gen" + "github.com/projecteru2/core/types" "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" @@ -59,7 +60,7 @@ func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error { } func (o *sendLargeWorkloadsOptions) toSendLargeFileChunks() []*corepb.FileOptions { - const maxChunkSize = 2 << 10 + maxChunkSize := types.SendLargeFileChunkSize ret := make([]*corepb.FileOptions, 0) for idx := 0; idx < len(o.content); idx += maxChunkSize { fileOption := &corepb.FileOptions{ From fd0bcd35bcc94f9b7eb3474f70c1682a94efd100 Mon Sep 17 00:00:00 2001 From: hongji Date: Wed, 19 Jul 2023 16:42:15 +0800 Subject: [PATCH 3/4] add close send --- cmd/workload/sendlarge.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/workload/sendlarge.go b/cmd/workload/sendlarge.go index 4327d09..5dfde13 100644 --- a/cmd/workload/sendlarge.go +++ b/cmd/workload/sendlarge.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "github.com/projecteru2/cli/cmd/utils" corepb "github.com/projecteru2/core/rpc/gen" @@ -30,7 +31,10 @@ func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error { return err } + wg := sync.WaitGroup{} + wg.Add(1) go func() { + defer wg.Done() for { msg, err := stream.Recv() if err == io.EOF { @@ -56,6 +60,8 @@ func (o *sendLargeWorkloadsOptions) run(ctx context.Context) error { return err } } + stream.CloseSend() + wg.Wait() return nil } From 03eda5f19e38b450779561c4dd3c61cf2d50590b Mon Sep 17 00:00:00 2001 From: hongji Date: Thu, 20 Jul 2023 15:00:26 +0800 Subject: [PATCH 4/4] modify usage --- cmd/workload/cmd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/workload/cmd.go b/cmd/workload/cmd.go index 1912806..9c43d8a 100644 --- a/cmd/workload/cmd.go +++ b/cmd/workload/cmd.go @@ -224,7 +224,7 @@ func Command() *cli.Command { Flags: []cli.Flag{ &cli.StringSliceFlag{ Name: "file", - Usage: "copy local files to workload, only can use single time. src_path:dst_path[:mode[:uid:gid]]", + Usage: "copy local file to workload, only can use single time. src_path:dst_path[:mode[:uid:gid]]", }, }, },