Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.1
github.com/pkg/errors v0.8.1 // indirect
github.com/pkg/errors v0.8.1
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/stretchr/objx v0.2.0 // indirect
Expand Down
29 changes: 22 additions & 7 deletions pkg/cmdutils/cmdutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmdutils

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -17,10 +18,10 @@ const IncompatibleFlags = "cannot be used at the same time"
// NewVerbCmd defines a standard resource command
func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command {
return &cobra.Command{
Use: use,
Short: short,
Long: long,
Aliases: aliases,
Use: use,
Short: short,
Long: long,
Aliases: aliases,
Run: func(cmd *cobra.Command, _ []string) {
if err := cmd.Help(); err != nil {
logger.Debug("ignoring error %q", err.Error())
Expand All @@ -29,18 +30,33 @@ func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command {
}
}

var CheckNameArgError = defaultNameArgsError

var defaultNameArgsError = func(err error) {
os.Exit(1)
}

// GetNameArg tests to ensure there is only 1 name argument
func GetNameArg(args []string) string {
if len(args) > 1 || len(args) == 0 {
logger.Critical("only one argument is allowed to be used as a name")
os.Exit(1)
CheckNameArgError(errors.New("only one argument is allowed to be used as a name"))
}
if len(args) == 1 {
return strings.TrimSpace(args[0])
}
return ""
}

func GetNameArgs(args []string, check func(args []string) error) []string {
err := check(args)
if err != nil {
logger.Critical(err.Error())
CheckNameArgError(err)
}
return args
}

func NewPulsarClient() pulsar.Client {
return PulsarCtlConfig.Client(pulsar.V2)
}
Expand All @@ -58,12 +74,11 @@ func PrintJson(w io.Writer, obj interface{}) {
fmt.Fprintln(w, string(b))
}


func PrintError(w io.Writer, err error) {
msg := err.Error()
if pulsar.IsAdminError(err) {
ae, _ := err.(pulsar.Error)
msg = ae.Reason
}
fmt.Fprintln(w, "error:", msg)
}
}
30 changes: 22 additions & 8 deletions pkg/cmdutils/verb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (

// VerbCmd holds attributes that most of the commands use
type VerbCmd struct {
Command *cobra.Command
FlagSetGroup *NamedFlagSetGroup
NameArg string
Command *cobra.Command
FlagSetGroup *NamedFlagSetGroup
NameArg string
NameArgs []string
}

// AddVerbCmd create a registers a new command under the given resource command
func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, newVerbCmd func(*VerbCmd)) {
verb := &VerbCmd {
Command: &cobra.Command{},
verb := &VerbCmd{
Command: &cobra.Command{},
}
verb.FlagSetGroup = flagGrouping.New(verb.Command)
newVerbCmd(verb)
Expand All @@ -28,7 +29,7 @@ func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, ne
func (vc *VerbCmd) SetDescription(use, short, long string, aliases ...string) {
vc.Command.Use = use
vc.Command.Short = short
vc.Command.Long = long
vc.Command.Long = long
vc.Command.Aliases = aliases
}

Expand All @@ -47,9 +48,22 @@ func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) {
}
}

func (vc *VerbCmd) SetRunFuncWithNameArgs(cmd func() error, checkArgs func(args []string) error) {
vc.Command.Run = func(_ *cobra.Command, args []string) {
vc.NameArgs = GetNameArgs(args, checkArgs)
run(cmd)
}
}

var ExecErrorHandler = defaultExecErrorHandler

var defaultExecErrorHandler = func(err error) {
logger.Critical("%s\n", err.Error())
os.Exit(1)
}

func run(cmd func() error) {
if err := cmd(); err != nil {
logger.Critical("%s\n", err.Error())
os.Exit(1)
ExecErrorHandler(err)
}
}
1 change: 1 addition & 0 deletions pkg/ctl/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateClusterCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updatePeerClustersCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getPeerClustersCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFailureDomainCmd)

return resourceCmd
}
94 changes: 94 additions & 0 deletions pkg/ctl/cluster/create_failure_domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package cluster

import (
"errors"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
)

func createFailureDomainCmd(vc *cmdutils.VerbCmd) {
var desc pulsar.LongDescription
desc.CommandUsedFor = "This command is used for creating a failure domain of the <cluster-name>."
desc.CommandPermission = "This command requires super-user permissions."

var examples []pulsar.Example
create := pulsar.Example{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Pulsar allow creating a failure domain without a broker list?

I don't think that is a valid case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I will check the broker list in pulsarctl.

Using pulsar-admin

bin/pulsar-admin clusters create-failure-domain --domain-name hello standalone

bin/pulsar-admin clusters list-failure-domains standalone
{
  "hello" : {
    "brokers" : [ ]
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Desc: "create the failure domain",
Command: "pulsarctl clusters create-failure-domain <cluster-name> <domain-name>",
}
examples = append(examples, create)

createWithBrokers := pulsar.Example{
Desc: "create the failure domain with brokers",
Command: "pulsarctl clusters create-failure-domain" +
" --broker-list <cluster-A> --broker-list <cluster-B> <cluster-name> <domain-name>",
}
examples = append(examples, createWithBrokers)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "Create failure domain <domain-name> for cluster <cluster-name> succeed",
}
out = append(out, successOut)

argsErrorOut := pulsar.Output{
Desc: "the args need to be specified as <cluster-name> <domain-name>",
Out: "[✖] need specified two names for cluster and failure domain",
}
out = append(out, argsErrorOut)
out = append(out, clusterNonExist)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clusterNonExist is for cluster commands. please make sure the error message is correct.

desc.CommandOutput = out

vc.SetDescription(
"create-failure-domain",
"Create a failure domain",
desc.ToString(),
"cfd")

var failureDomainData pulsar.FailureDomainData

vc.SetRunFuncWithNameArg(func() error {
return doCreateFailureDomain(vc, &failureDomainData)
})

checkArgs := func(args []string) error {
if len(args) != 2 {
return errors.New("need to specified two names for cluster and failure domain")
}
return nil
}

vc.SetRunFuncWithNameArgs(func() error {
return doCreateFailureDomain(vc, &failureDomainData)
}, checkArgs)

vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) {
set.StringSliceVarP(
&failureDomainData.BrokerList,
"broker-list",
"b",
nil,
"Set the failure domain clusters")
})
}

func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error {
failureDomain.ClusterName = vc.NameArgs[0]
failureDomain.DomainName = vc.NameArgs[1]

if len(failureDomain.BrokerList) == 0 || failureDomain.BrokerList == nil {
return errors.New("broker list must be specified")
}

admin := cmdutils.NewPulsarClient()
err := admin.Clusters().CreateFailureDomain(*failureDomain)
if err == nil {
vc.Command.Printf(
"Create failure domain [%s] for cluster [%s] succeed\n",
failureDomain.DomainName, failureDomain.ClusterName)
}
return err
}
20 changes: 20 additions & 0 deletions pkg/ctl/cluster/create_failure_domain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cluster

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestCreateFailureDomainCmdSuccess(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add test cases for failure cases.

args := []string{"create-failure-domain", "-b", "cluster-A", "standalone", "standalone-failure-domain"}
_, execErr, NameErr, err := TestClusterCommands(createFailureDomainCmd, args)
assert.Nil(t, execErr)
assert.Nil(t, NameErr)
assert.Nil(t, err)
}

func TestCreateFailureDomainCmdBrokerListError(t *testing.T) {
args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"}
_, execErr, _, _ := TestClusterCommands(createFailureDomainCmd, args)
assert.Equal(t, "broker list must be specified", execErr.Error())
}
8 changes: 4 additions & 4 deletions pkg/ctl/cluster/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (

func TestDeleteClusterCmd(t *testing.T) {
args := []string{"add", "test"}
_, err := TestClusterCommands(createClusterCmd, args)
_, _, _, err := TestClusterCommands(createClusterCmd, args)
assert.Nil(t, err)

args = []string{"list"}
out, err := TestClusterCommands(listClustersCmd, args)
out, _, _, err := TestClusterCommands(listClustersCmd, args)
assert.Nil(t, err)
clusters := out.String()
assert.True(t, strings.Contains(clusters, "test"))

args = []string{"delete", "test"}
_, err = TestClusterCommands(deleteClusterCmd, args)
_, _, _, err = TestClusterCommands(deleteClusterCmd, args)
assert.Nil(t, err)

args = []string{"list"}
out, err = TestClusterCommands(listClustersCmd, args)
out, _, _, err = TestClusterCommands(listClustersCmd, args)
assert.Nil(t, err)
clusters = out.String()
assert.False(t, strings.Contains(clusters, "test"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/ctl/cluster/get_peer_clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

func TestGetPeerClustersCmd(t *testing.T) {
args := []string{"add", "test_get_peer", "--peer-cluster", "standalone"}
_, err := TestClusterCommands(createClusterCmd, args)
_, _, _, err := TestClusterCommands(createClusterCmd, args)
if err != nil {
t.Fatal(err)
}

args = []string{"gpc", "test_get_peer"}
out, err := TestClusterCommands(getPeerClustersCmd, args)
out, _, _, err := TestClusterCommands(getPeerClustersCmd, args)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ctl/cluster/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestGetClusterData(t *testing.T) {
args := []string{"get", "standalone"}
out, err := TestClusterCommands(getClusterDataCmd, args)
out, _, _, err := TestClusterCommands(getClusterDataCmd, args)
if err != nil {
t.Error(err)
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/ctl/cluster/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,18 @@ import (
"os"
)

func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) {
func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) {

var execError error
cmdutils.ExecErrorHandler = func(err error) {
execError = err
}

var nameError error
cmdutils.CheckNameArgError = func(err error) {
nameError = err
}

var rootCmd = &cobra.Command {
Use: "pulsarctl [command]",
Short: "a CLI for Apache Pulsar",
Expand All @@ -34,7 +45,7 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou
rootCmd.AddCommand(resourceCmd)
err = rootCmd.Execute()

return buf, err
return buf, execError, nameError, err
}

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/ctl/cluster/update_peer_clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (

func TestUpdatePeerClusters(t *testing.T) {
args := []string{"add", "test_peer_cluster"}
_, err := TestClusterCommands(createClusterCmd, args)
_, _, _, err := TestClusterCommands(createClusterCmd, args)
if err != nil {
t.Fatal(err)
}

args = []string{"update-peer-clusters", "-p", "test_peer_cluster", "standalone"}
_, err = TestClusterCommands(updatePeerClustersCmd, args)
_, _, _, err = TestClusterCommands(updatePeerClustersCmd, args)
if err != nil {
t.Fatal(err)
}

args = []string{"get", "standalone"}
out, err := TestClusterCommands(getClusterDataCmd, args)
out, _, _, err := TestClusterCommands(getClusterDataCmd, args)

var clusterData pulsar.ClusterData
err = json.Unmarshal(out.Bytes(), &clusterData)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ctl/cluster/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ func TestUpdateCluster(t *testing.T) {
"standalone",
}

_, err := TestClusterCommands(updateClusterCmd, args)
_, _, _, err := TestClusterCommands(updateClusterCmd, args)
if err != nil {
t.Error(err)
}

args = []string{"get", "standalone"}
out, err := TestClusterCommands(getClusterDataCmd, args)
out, _, _, err := TestClusterCommands(getClusterDataCmd, args)

var data pulsar.ClusterData
err = json.Unmarshal(out.Bytes(), &data)
Expand Down
1 change: 1 addition & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
)

cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd)

return resourceCmd
}
Loading