Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
go-version: 1.17

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
Expand Down
2 changes: 1 addition & 1 deletion cmd/node/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (o *addNodeOptions) run(ctx context.Context) error {
return err
}

describe.Nodes(node)
describe.Nodes(describe.ToNodeChan(node), false)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/node/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (o *getNodeOptions) run(ctx context.Context) error {
return err
}

describe.NodesWithInfo(node)
describe.NodesWithInfo(describe.ToNodeChan(node), false)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/node/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (o *nodeResourceOptions) run(ctx context.Context) error {
return err
}

describe.NodeResources(resource)
describe.NodeResources(describe.ToNodeResourceChan(resource), false)
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions cmd/pod/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func Command() *cli.Command {
Usage: "filter resource value, can be cpu/memory/storage/volume </<=/>/>=/== 40% or 0.4",
Value: "all",
},
&cli.BoolFlag{
Name: "stream",
Usage: "fetch streaming data",
},
},
},
{
Expand Down Expand Up @@ -121,6 +125,10 @@ func Command() *cli.Command {
Name: "show-info",
Usage: "show node info",
},
&cli.BoolFlag{
Name: "stream",
Usage: "fetch streaming data",
},
},
},
{
Expand Down
63 changes: 50 additions & 13 deletions cmd/pod/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pod

import (
"context"
"io"
"strings"

"github.com/projecteru2/cli/cmd/utils"
Expand All @@ -25,6 +26,7 @@ type listPodNodesOptions struct {
labels map[string]string
timeoutInSecond int32
showInfo bool
stream bool
}

func (o *listPodNodesOptions) run(ctx context.Context) error {
Expand All @@ -40,6 +42,7 @@ func (o *listPodNodesOptions) listDown(ctx context.Context) error {
All: true,
Labels: o.labels,
TimeoutInSecond: o.timeoutInSecond,
SkipInfo: !o.showInfo,
})
if err != nil {
return err
Expand All @@ -50,6 +53,7 @@ func (o *listPodNodesOptions) listDown(ctx context.Context) error {
All: false,
Labels: o.labels,
TimeoutInSecond: o.timeoutInSecond,
SkipInfo: !o.showInfo,
})
if err != nil {
return err
Expand All @@ -68,32 +72,64 @@ func (o *listPodNodesOptions) listDown(ctx context.Context) error {
nodes = append(nodes, node)
}

o.describeNodes(nodes)
o.describeNodes(describe.ToNodeChan(nodes...), o.stream)
return nil
}

func (o *listPodNodesOptions) listUpOrAll(ctx context.Context) error {
// filter == all, list all nodes
// filter == up, list available nodes only
resp, err := o.client.ListPodNodes(ctx, &corepb.ListNodesOptions{
Podname: o.name,
All: o.filter == all,
Labels: o.labels,
TimeoutInSecond: o.timeoutInSecond,
})
if err != nil {
return err
var ch chan *corepb.Node
if o.stream { // nolint
resp, err := o.client.PodNodesStream(ctx, &corepb.ListNodesOptions{
Podname: o.name,
All: o.filter == all,
Labels: o.labels,
TimeoutInSecond: o.timeoutInSecond,
SkipInfo: !o.showInfo,
})
if err != nil {
return err
}
ch = make(chan *corepb.Node)
go func() {
defer close(ch)
for {
node, err := resp.Recv()
if err != nil {
if err != io.EOF {
println(err.Error())
}
return
}
ch <- node
}
}()

} else {
resp, err := o.client.ListPodNodes(ctx, &corepb.ListNodesOptions{
Podname: o.name,
All: o.filter == all,
Labels: o.labels,
TimeoutInSecond: o.timeoutInSecond,
SkipInfo: !o.showInfo,
})
if err != nil {
return err
}
ch = describe.ToNodeChan(resp.GetNodes()...)

}

o.describeNodes(resp.GetNodes())
o.describeNodes(ch, o.stream)
return nil
}

func (o *listPodNodesOptions) describeNodes(nodes []*corepb.Node) {
func (o *listPodNodesOptions) describeNodes(nodes chan *corepb.Node, stream bool) {
if o.showInfo {
describe.NodesWithInfo(nodes...)
describe.NodesWithInfo(nodes, stream)
} else {
describe.Nodes(nodes...)
describe.Nodes(nodes, stream)
}
}

Expand All @@ -115,6 +151,7 @@ func cmdPodListNodes(c *cli.Context) error {
labels: utils.SplitEquality(c.StringSlice("label")),
timeoutInSecond: int32(c.Int("timeout")),
showInfo: c.Bool("show-info"),
stream: c.Bool("stream"),
}
return o.run(c.Context)
}
64 changes: 48 additions & 16 deletions cmd/pod/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pod

import (
"context"
"io"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -63,12 +64,13 @@ type resourcePodOptions struct {
client corepb.CoreRPCClient
name string
expr string
stream bool
}

func (o *resourcePodOptions) filter(nrs []*corepb.NodeResource) ([]*corepb.NodeResource, error) {
func (o *resourcePodOptions) filter(ch chan *corepb.NodeResource) (chan *corepb.NodeResource, error) {
filter := match(o.expr)
if len(filter) == 0 {
return nrs, nil
return ch, nil
}

var (
Expand All @@ -88,31 +90,60 @@ func (o *resourcePodOptions) filter(nrs []*corepb.NodeResource) ([]*corepb.NodeR
v /= 100
}

rv := []*corepb.NodeResource{}
for _, nr := range nrs {
l := attr(nr, filter["name"])
if !op(filter["op"], l, v) {
continue
rv := make(chan *corepb.NodeResource)
go func() {
defer close(rv)
for nr := range ch {
l := attr(nr, filter["name"])
if !op(filter["op"], l, v) {
continue
}
rv <- nr
}
rv = append(rv, nr)
}
}()
return rv, nil
}

func (o *resourcePodOptions) run(ctx context.Context) error {
resp, err := o.client.GetPodResource(ctx, &corepb.GetPodOptions{
Name: o.name,
})
if err != nil {
return err
var ch chan *corepb.NodeResource
if o.stream { // nolint
resp, err := o.client.PodResourceStream(ctx, &corepb.GetPodOptions{
Name: o.name,
})
if err != nil {
return err
}

ch = make(chan *corepb.NodeResource)
go func() {
defer close(ch)
for {
resource, err := resp.Recv()
if err != nil {
if err != io.EOF {
println(err.Error())
}
return
}
ch <- resource
}
}()
} else {
resp, err := o.client.GetPodResource(ctx, &corepb.GetPodOptions{
Name: o.name,
})
if err != nil {
return err
}
ch = describe.ToNodeResourceChan(resp.NodesResource...)
}

nrs, err := o.filter(resp.NodesResource)
ch, err := o.filter(ch)
if err != nil {
return err
}

describe.NodeResources(nrs...)
describe.NodeResources(ch, o.stream)
return nil
}

Expand All @@ -131,6 +162,7 @@ func cmdPodResource(c *cli.Context) error {
client: client,
name: name,
expr: c.String("filter"),
stream: c.Bool("stream"),
}
return o.run(c.Context)
}
Loading