From b4fa7ebaea4357fee60d2a3c9b6da9b887b52eae Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 29 Aug 2019 17:31:49 +0800 Subject: [PATCH 01/17] Add command cluster `get-peer-clusters` --- Master Issue: #2 --- pkg/pulsar/cluster.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 9465a9e9..8cff24b2 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -51,14 +51,14 @@ func (c *clusters) Update(cdata ClusterData) error { endpoint := c.client.endpoint(c.basePath, cdata.Name) return c.client.post(endpoint, &cdata, nil) } - -func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { - endpoint := c.client.endpoint(c.basePath, cluster, "peers") - return c.client.post(endpoint, peerClusters, nil) -} func (c *clusters) GetPeerClusters(name string) ([]string, error) { var peerClusters []string endpoint := c.client.endpoint(c.basePath, name, "peers") err := c.client.get(endpoint, &peerClusters) return peerClusters, err } + +func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { + endpoint := c.client.endpoint(c.basePath, cluster, "peers") + return c.client.post(endpoint, peerClusters, nil) +} From 18e642053a9b2b2c094716cf433501bd5b902198 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 30 Aug 2019 15:02:24 +0800 Subject: [PATCH 02/17] Add command cluster `create-failure-domain` --- pkg/ctl/cluster/cluster.go | 1 + pkg/ctl/cluster/create_failure_domain.go | 75 +++++++++++++++++++ pkg/ctl/cluster/create_failure_domain_test.go | 12 +++ pkg/pulsar/cluster.go | 5 ++ pkg/pulsar/data.go | 7 ++ 5 files changed, 100 insertions(+) create mode 100644 pkg/ctl/cluster/create_failure_domain.go create mode 100644 pkg/ctl/cluster/create_failure_domain_test.go diff --git a/pkg/ctl/cluster/cluster.go b/pkg/ctl/cluster/cluster.go index f57df01a..c8ae3593 100644 --- a/pkg/ctl/cluster/cluster.go +++ b/pkg/ctl/cluster/cluster.go @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateClusterCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updatePeerClustersCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getPeerClustersCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFailureDomainCmd) return resourceCmd } diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go new file mode 100644 index 00000000..5294f7c5 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -0,0 +1,75 @@ +package cluster + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func createFailureDomainCmd(vc *cmdutils.VerbCmd) { + var desc pulsar.LongDescription + desc.CommandUsedFor = "This command is used for creating a failure domain of the ." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + create := pulsar.Example{ + Desc: "creating the failure domain", + Command: "pulsarctl clusters create-failure-domain --domain-name ", + } + examples = append(examples, create) + + createWithBrokers := pulsar.Example{ + Desc: "creteing the failure domain with brokers", + Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", + } + examples = append(examples, createWithBrokers) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normao output", + Out: "Create failure domain for cluster succeed", + } + out = append(out, successOut) + out = append(out, argsError) + out = append(out, clusterNonExist) + desc.CommandOutput = out + + vc.SetDescription( + "create-failure-domain", + "Create a failure domain", + desc.ToString(), + "cfd") + + var failureDomainData pulsar.FailureDomainData + + vc.SetRunFuncWithNameArg(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }) + + vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) { + set.StringVar( + &failureDomainData.DomainName, + "domain-name", + "", + "The failure domain name") + set.StringArrayVarP( + &failureDomainData.BrokerList, + "broker-list", + "b", + []string{""}, + "Set the failure domain clusters") + }) +} + +func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error { + failureDomain.ClusterName = vc.NameArg + admin := cmdutils.NewPulsarClient() + err := admin.Clusters().CreateFailureDomain(*failureDomain) + if err == nil { + vc.Command.Printf( + "Create failure domain [%s] for cluster [%s] succeed\n", + failureDomain.DomainName, failureDomain.ClusterName) + } + return err +} diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go new file mode 100644 index 00000000..8b5a9745 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -0,0 +1,12 @@ +package cluster + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCreateFailureDomainCmdSuccess(t *testing.T) { + args := []string{"cfd", "--domain-name", "test-domain", "standalone"} + _, err := TestClusterCommands(createFailureDomainCmd, args) + assert.Nil(t, err) +} diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 8cff24b2..0d87a29d 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -10,6 +10,7 @@ type Clusters interface { Update(ClusterData) error UpdatePeerClusters(string, []string) error GetPeerClusters(string) ([]string, error) + CreateFailureDomain(FailureDomainData) error } type clusters struct { @@ -62,3 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } +func (c *clusters) CreateFailureDomain(data FailureDomainData) error { + endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) + return c.client.post(endpoint, &data, nil) +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index dbad0508..a3cb38ab 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -49,3 +49,10 @@ type FunctionData struct { FuncConf *FunctionConfig `json:"-"` UserCodeFile string `json:"-"` } + +// Failure Domain information +type FailureDomainData struct { + ClusterName string `json:"-"` + DomainName string `json:"-"` + BrokerList []string `json:"brokerList"` +} From 9904852dc614e7297096f916f1049a6694e7ec96 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Mon, 2 Sep 2019 12:07:03 +0800 Subject: [PATCH 03/17] Fix typo --- pkg/ctl/cluster/create_failure_domain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 5294f7c5..8c9d8e0d 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -19,7 +19,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "creteing the failure domain with brokers", + Desc: "creating the failure domain with brokers", Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", } examples = append(examples, createWithBrokers) @@ -27,7 +27,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: "normao output", + Desc: "normal output", Out: "Create failure domain for cluster succeed", } out = append(out, successOut) From 528e494ede5c37d855dfd6e41f9a80c194f0bc15 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 10:45:28 +0800 Subject: [PATCH 04/17] Add an empty line --- pkg/pulsar/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 0d87a29d..052bd820 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -63,6 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } +  func (c *clusters) CreateFailureDomain(data FailureDomainData) error { endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) return c.client.post(endpoint, &data, nil) From 919d4d0c6d01879219b8ce437213c0537c83f594 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 10:49:19 +0800 Subject: [PATCH 05/17] Fix build --- pkg/pulsar/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 052bd820..a98b8cf3 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -63,7 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } -  + func (c *clusters) CreateFailureDomain(data FailureDomainData) error { endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) return c.client.post(endpoint, &data, nil) From b3721e675dedcaf2e1277322ce4904250109456a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=89=E5=B0=8F=E9=BE=99?= Date: Tue, 3 Sep 2019 13:38:48 +0800 Subject: [PATCH 06/17] Add stop cmd for Pulsar Functions (#25) Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 1 + pkg/ctl/functions/stop.go | 145 +++++++++++++++++++++++++++++++++ pkg/ctl/functions/stop_test.go | 73 +++++++++++++++++ pkg/ctl/functions/util.go | 34 ++++++++ pkg/pulsar/data.go | 1 + pkg/pulsar/functions.go | 27 ++++++ 6 files changed, 281 insertions(+) create mode 100644 pkg/ctl/functions/stop.go create mode 100644 pkg/ctl/functions/stop_test.go diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index b4b82474..e92891f5 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -32,6 +32,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { ) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/stop.go b/pkg/ctl/functions/stop.go new file mode 100644 index 00000000..1844e67c --- /dev/null +++ b/pkg/ctl/functions/stop.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func stopFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for stopping function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + stop := pulsar.Example{ + Desc: "Stops function instance", + Command: "pulsarctl functions stop \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, stop) + + stopWithInstanceID := pulsar.Example{ + Desc: "Stops function instance with instance ID", + Command: "pulsarctl functions stop \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, stopWithInstanceID) + + stopWithFQFN := pulsar.Example{ + Desc: "Stops function instance with FQFN", + Command: "pulsarctl functions stop \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, stopWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Stopped successfully", + } + + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "stop", + "", + desc.ToString(), + "stop", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStopFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (stop all instances if instance-id is not provided)") + }) +} + +func doStopFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().StopFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Stopped %s successfully", funcData.FuncName) + } else { + err = admin.Functions().StopFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Stopped %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/stop_test.go b/pkg/ctl/functions/stop_test.go new file mode 100644 index 00000000..f574cc42 --- /dev/null +++ b/pkg/ctl/functions/stop_test.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestStopFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + stopArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop", + } + + _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-stop-fqfn", + } + + _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) +} diff --git a/pkg/ctl/functions/util.go b/pkg/ctl/functions/util.go index 910a4cbc..30756e4b 100644 --- a/pkg/ctl/functions/util.go +++ b/pkg/ctl/functions/util.go @@ -382,3 +382,37 @@ func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { return nil } + +func processBaseArguments(funcData *pulsar.FunctionData) error { + usesSetters := funcData.Tenant != "" || funcData.Namespace != "" || funcData.FuncName != "" + usesFqfn := funcData.FQFN != "" + + // return error if --fqfn is set alongside any combination of --tenant, --namespace, and --name + if usesFqfn && usesSetters { + return errors.New("you must specify either a Fully Qualified Function Name (FQFN) or tenant, namespace, and function name") + } else if usesFqfn { + // If the --fqfn flag is used, parse tenant, namespace, and name using that flag + fqfnParts := strings.Split(funcData.FQFN, "/") + if len(fqfnParts) != 3 { + return errors.New("fully qualified function names (FQFNs) must be of the form tenant/namespace/name") + } + + funcData.Tenant = fqfnParts[0] + funcData.Namespace = fqfnParts[1] + funcData.FuncName = fqfnParts[2] + } else { + if funcData.Tenant == "" { + funcData.Tenant = PublicTenant + } + + if funcData.Namespace == "" { + funcData.Namespace = DefaultNamespace + } + + if funcData.FuncName == "" { + return errors.New("you must specify a name for the function or a Fully Qualified Function Name (FQFN)") + } + } + + return nil +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index dbad0508..029d5382 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -16,6 +16,7 @@ type FunctionData struct { Tenant string `json:"tenant"` Namespace string `json:"namespace"` FuncName string `json:"functionName"` + InstanceID string `json:"instance_id"` ClassName string `json:"className"` Jar string `json:"jarFile"` Py string `json:"pyFile"` diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index db09a619..d5352039 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -43,6 +43,12 @@ type Functions interface { // @param pkgUrl // url from which pkg can be downloaded CreateFuncWithUrl(data *FunctionConfig, pkgUrl string) error + + // Stop all function instances + StopFunction(tenant, namespace, name string) error + + // Stop function instance + StopFunctionWithID(tenant, namespace, name string, instanceID int) error } type functions struct { @@ -174,3 +180,24 @@ func (f *functions) CreateFuncWithUrl(funcConf *FunctionConfig, pkgUrl string) e return nil } + +func (f *functions) StopFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.post(endpoint+"/stop", "", nil) + if err != nil { + return err + } + return nil +} + +func (f *functions) StopFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + err := f.client.post(endpoint+"/stop", "", nil) + if err != nil { + return err + } + + return nil +} From 664934dc8a56c213fac277c53880541d9fa16c8c Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 14:26:18 +0800 Subject: [PATCH 07/17] Address comments --- go.mod | 2 +- pkg/cmdutils/cmdutils.go | 9 +++++ pkg/cmdutils/verb.go | 8 ++++ pkg/ctl/cluster/create_failure_domain.go | 39 +++++++++++++------ pkg/ctl/cluster/create_failure_domain_test.go | 2 +- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index c72d83b7..95083551 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect github.com/olekukonko/tablewriter v0.0.1 - github.com/pkg/errors v0.8.1 // indirect + github.com/pkg/errors v0.8.1 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.3 github.com/stretchr/objx v0.2.0 // indirect diff --git a/pkg/cmdutils/cmdutils.go b/pkg/cmdutils/cmdutils.go index 18ebfca5..42fe8aba 100644 --- a/pkg/cmdutils/cmdutils.go +++ b/pkg/cmdutils/cmdutils.go @@ -41,6 +41,15 @@ func GetNameArg(args []string) string { return "" } +func GetNameArgs(args []string, check func(args []string) error) []string { + err := check(args) + if err != nil { + logger.Critical(err.Error()) + os.Exit(1) + } + return args +} + func NewPulsarClient() pulsar.Client { return PulsarCtlConfig.Client(pulsar.V2) } diff --git a/pkg/cmdutils/verb.go b/pkg/cmdutils/verb.go index 0be6d8a9..5a67af59 100644 --- a/pkg/cmdutils/verb.go +++ b/pkg/cmdutils/verb.go @@ -11,6 +11,7 @@ type VerbCmd struct { Command *cobra.Command FlagSetGroup *NamedFlagSetGroup NameArg string + NameArgs []string } // AddVerbCmd create a registers a new command under the given resource command @@ -47,6 +48,13 @@ func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) { } } +func (vc *VerbCmd) SetRunFuncWithNameArgs(cmd func() error, checkArgs func(args []string) error) { + vc.Command.Run = func(_ *cobra.Command, args []string) { + vc.NameArgs = GetNameArgs(args, checkArgs) + run(cmd) + } +} + func run(cmd func() error) { if err := cmd(); err != nil { logger.Critical("%s\n", err.Error()) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 8c9d8e0d..31885c36 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -1,6 +1,7 @@ package cluster import ( + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" @@ -13,14 +14,15 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { var examples []pulsar.Example create := pulsar.Example{ - Desc: "creating the failure domain", - Command: "pulsarctl clusters create-failure-domain --domain-name ", + Desc: "create the failure domain", + Command: "pulsarctl clusters create-failure-domain ", } examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "creating the failure domain with brokers", - Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", + Desc: "create the failure domain with brokers", + Command: "pulsarctl clusters create-failure-domain" + + " --broker-list --broker-list ", } examples = append(examples, createWithBrokers) desc.CommandExamples = examples @@ -31,7 +33,12 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { Out: "Create failure domain for cluster succeed", } out = append(out, successOut) - out = append(out, argsError) + + argsErrorOut := pulsar.Output{ + Desc:"the args need to be specified as ", + Out: "[✖] need specified two names for cluster and failure domain", + } + out =append(out, argsErrorOut) out = append(out, clusterNonExist) desc.CommandOutput = out @@ -47,13 +54,19 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { return doCreateFailureDomain(vc, &failureDomainData) }) + checkArgs := func(args []string) error { + if len(args) != 2 { + return errors.New("need to specified two names for cluster and failure domain") + } + return nil + } + + vc.SetRunFuncWithNameArgs(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }, checkArgs) + vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) { - set.StringVar( - &failureDomainData.DomainName, - "domain-name", - "", - "The failure domain name") - set.StringArrayVarP( + set.StringSliceVarP( &failureDomainData.BrokerList, "broker-list", "b", @@ -63,7 +76,9 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { } func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error { - failureDomain.ClusterName = vc.NameArg + failureDomain.ClusterName = vc.NameArgs[0] + failureDomain.DomainName = vc.NameArgs[1] + admin := cmdutils.NewPulsarClient() err := admin.Clusters().CreateFailureDomain(*failureDomain) if err == nil { diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go index 8b5a9745..a9d49da5 100644 --- a/pkg/ctl/cluster/create_failure_domain_test.go +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -6,7 +6,7 @@ import ( ) func TestCreateFailureDomainCmdSuccess(t *testing.T) { - args := []string{"cfd", "--domain-name", "test-domain", "standalone"} + args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"} _, err := TestClusterCommands(createFailureDomainCmd, args) assert.Nil(t, err) } From 1e6457bfc874a3f928a2b7fbd984f64f83b98971 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 14:29:18 +0800 Subject: [PATCH 08/17] format code --- pkg/ctl/cluster/create_failure_domain.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 31885c36..400e2d11 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -20,7 +20,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "create the failure domain with brokers", + Desc: "create the failure domain with brokers", Command: "pulsarctl clusters create-failure-domain" + " --broker-list --broker-list ", } @@ -35,10 +35,10 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { out = append(out, successOut) argsErrorOut := pulsar.Output{ - Desc:"the args need to be specified as ", - Out: "[✖] need specified two names for cluster and failure domain", + Desc: "the args need to be specified as ", + Out: "[✖] need specified two names for cluster and failure domain", } - out =append(out, argsErrorOut) + out = append(out, argsErrorOut) out = append(out, clusterNonExist) desc.CommandOutput = out From 064bb44a78c1e0e0b7efb43f8a970bbfca95d9ee Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 16:50:29 +0800 Subject: [PATCH 09/17] Add handler to handle test error (#27) ### Motivation We can't get the error info when running a command. When testing a command, we need to verify the error output. ### Modification Make a handler to handle errors. Create a test handler in TestClusterCommand for test other commands. --- pkg/cmdutils/cmdutils.go | 9 ++++++++- pkg/cmdutils/verb.go | 10 ++++++++-- pkg/ctl/cluster/delete_test.go | 8 ++++---- pkg/ctl/cluster/get_peer_clusters_test.go | 4 ++-- pkg/ctl/cluster/get_test.go | 2 +- pkg/ctl/cluster/test.go | 15 +++++++++++++-- pkg/ctl/cluster/update_peer_clusters_test.go | 6 +++--- pkg/ctl/cluster/update_test.go | 4 ++-- 8 files changed, 41 insertions(+), 17 deletions(-) diff --git a/pkg/cmdutils/cmdutils.go b/pkg/cmdutils/cmdutils.go index 18ebfca5..82c7feb8 100644 --- a/pkg/cmdutils/cmdutils.go +++ b/pkg/cmdutils/cmdutils.go @@ -2,6 +2,7 @@ package cmdutils import ( "encoding/json" + "errors" "fmt" "io" "os" @@ -29,11 +30,17 @@ func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command { } } +var CheckNameArgError = defaultNameArgsError + +var defaultNameArgsError = func(err error) { + os.Exit(1) +} + // GetNameArg tests to ensure there is only 1 name argument func GetNameArg(args []string) string { if len(args) > 1 || len(args) == 0 { logger.Critical("only one argument is allowed to be used as a name") - os.Exit(1) + CheckNameArgError(errors.New("only one argument is allowed to be used as a name")) } if len(args) == 1 { return strings.TrimSpace(args[0]) diff --git a/pkg/cmdutils/verb.go b/pkg/cmdutils/verb.go index 0be6d8a9..21b8aab7 100644 --- a/pkg/cmdutils/verb.go +++ b/pkg/cmdutils/verb.go @@ -47,9 +47,15 @@ func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) { } } +var ExecErrorHandler = defaultExecErrorHandler + +var defaultExecErrorHandler = func(err error) { + logger.Critical("%s\n", err.Error()) + os.Exit(1) +} + func run(cmd func() error) { if err := cmd(); err != nil { - logger.Critical("%s\n", err.Error()) - os.Exit(1) + ExecErrorHandler(err) } } diff --git a/pkg/ctl/cluster/delete_test.go b/pkg/ctl/cluster/delete_test.go index 8d13236e..a3688ee7 100644 --- a/pkg/ctl/cluster/delete_test.go +++ b/pkg/ctl/cluster/delete_test.go @@ -8,21 +8,21 @@ import ( func TestDeleteClusterCmd(t *testing.T) { args := []string{"add", "test"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) assert.Nil(t, err) args = []string{"list"} - out, err := TestClusterCommands(listClustersCmd, args) + out, _, _, err := TestClusterCommands(listClustersCmd, args) assert.Nil(t, err) clusters := out.String() assert.True(t, strings.Contains(clusters, "test")) args = []string{"delete", "test"} - _, err = TestClusterCommands(deleteClusterCmd, args) + _, _, _, err = TestClusterCommands(deleteClusterCmd, args) assert.Nil(t, err) args = []string{"list"} - out, err = TestClusterCommands(listClustersCmd, args) + out, _, _, err = TestClusterCommands(listClustersCmd, args) assert.Nil(t, err) clusters = out.String() assert.False(t, strings.Contains(clusters, "test")) diff --git a/pkg/ctl/cluster/get_peer_clusters_test.go b/pkg/ctl/cluster/get_peer_clusters_test.go index d1e9c0aa..d5008cd9 100644 --- a/pkg/ctl/cluster/get_peer_clusters_test.go +++ b/pkg/ctl/cluster/get_peer_clusters_test.go @@ -8,13 +8,13 @@ import ( func TestGetPeerClustersCmd(t *testing.T) { args := []string{"add", "test_get_peer", "--peer-cluster", "standalone"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) if err != nil { t.Fatal(err) } args = []string{"gpc", "test_get_peer"} - out, err := TestClusterCommands(getPeerClustersCmd, args) + out, _, _, err := TestClusterCommands(getPeerClustersCmd, args) if err != nil { t.Fatal(err) } diff --git a/pkg/ctl/cluster/get_test.go b/pkg/ctl/cluster/get_test.go index 761d21e6..72cfb520 100644 --- a/pkg/ctl/cluster/get_test.go +++ b/pkg/ctl/cluster/get_test.go @@ -11,7 +11,7 @@ import ( func TestGetClusterData(t *testing.T) { args := []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) if err != nil { t.Error(err) } diff --git a/pkg/ctl/cluster/test.go b/pkg/ctl/cluster/test.go index 99ba6785..cd0537e6 100644 --- a/pkg/ctl/cluster/test.go +++ b/pkg/ctl/cluster/test.go @@ -8,7 +8,18 @@ import ( "os" ) -func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { +func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { + + var execError error + cmdutils.ExecErrorHandler = func(err error) { + execError = err + } + + var nameError error + cmdutils.CheckNameArgError = func(err error) { + nameError = err + } + var rootCmd = &cobra.Command { Use: "pulsarctl [command]", Short: "a CLI for Apache Pulsar", @@ -34,7 +45,7 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou rootCmd.AddCommand(resourceCmd) err = rootCmd.Execute() - return buf, err + return buf, execError, nameError, err } var ( diff --git a/pkg/ctl/cluster/update_peer_clusters_test.go b/pkg/ctl/cluster/update_peer_clusters_test.go index bc6757d7..1bd81a9b 100644 --- a/pkg/ctl/cluster/update_peer_clusters_test.go +++ b/pkg/ctl/cluster/update_peer_clusters_test.go @@ -9,19 +9,19 @@ import ( func TestUpdatePeerClusters(t *testing.T) { args := []string{"add", "test_peer_cluster"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) if err != nil { t.Fatal(err) } args = []string{"update-peer-clusters", "-p", "test_peer_cluster", "standalone"} - _, err = TestClusterCommands(updatePeerClustersCmd, args) + _, _, _, err = TestClusterCommands(updatePeerClustersCmd, args) if err != nil { t.Fatal(err) } args = []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) var clusterData pulsar.ClusterData err = json.Unmarshal(out.Bytes(), &clusterData) diff --git a/pkg/ctl/cluster/update_test.go b/pkg/ctl/cluster/update_test.go index 6b4fbf8c..89c4208c 100644 --- a/pkg/ctl/cluster/update_test.go +++ b/pkg/ctl/cluster/update_test.go @@ -19,13 +19,13 @@ func TestUpdateCluster(t *testing.T) { "standalone", } - _, err := TestClusterCommands(updateClusterCmd, args) + _, _, _, err := TestClusterCommands(updateClusterCmd, args) if err != nil { t.Error(err) } args = []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) var data pulsar.ClusterData err = json.Unmarshal(out.Bytes(), &data) From ed3f70931163ec72f0bd61347fd5a1829fd031e6 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 29 Aug 2019 17:31:49 +0800 Subject: [PATCH 10/17] Add command cluster `get-peer-clusters` --- Master Issue: #2 --- pkg/pulsar/cluster.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 9465a9e9..8cff24b2 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -51,14 +51,14 @@ func (c *clusters) Update(cdata ClusterData) error { endpoint := c.client.endpoint(c.basePath, cdata.Name) return c.client.post(endpoint, &cdata, nil) } - -func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { - endpoint := c.client.endpoint(c.basePath, cluster, "peers") - return c.client.post(endpoint, peerClusters, nil) -} func (c *clusters) GetPeerClusters(name string) ([]string, error) { var peerClusters []string endpoint := c.client.endpoint(c.basePath, name, "peers") err := c.client.get(endpoint, &peerClusters) return peerClusters, err } + +func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { + endpoint := c.client.endpoint(c.basePath, cluster, "peers") + return c.client.post(endpoint, peerClusters, nil) +} From 9b271e202dd276d073f4ae52d4fb41cd5759079f Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 30 Aug 2019 15:02:24 +0800 Subject: [PATCH 11/17] Add command cluster `create-failure-domain` --- pkg/ctl/cluster/cluster.go | 1 + pkg/ctl/cluster/create_failure_domain.go | 75 +++++++++++++++++++ pkg/ctl/cluster/create_failure_domain_test.go | 12 +++ pkg/pulsar/cluster.go | 5 ++ pkg/pulsar/data.go | 7 ++ 5 files changed, 100 insertions(+) create mode 100644 pkg/ctl/cluster/create_failure_domain.go create mode 100644 pkg/ctl/cluster/create_failure_domain_test.go diff --git a/pkg/ctl/cluster/cluster.go b/pkg/ctl/cluster/cluster.go index f57df01a..c8ae3593 100644 --- a/pkg/ctl/cluster/cluster.go +++ b/pkg/ctl/cluster/cluster.go @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateClusterCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updatePeerClustersCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getPeerClustersCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFailureDomainCmd) return resourceCmd } diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go new file mode 100644 index 00000000..5294f7c5 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -0,0 +1,75 @@ +package cluster + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func createFailureDomainCmd(vc *cmdutils.VerbCmd) { + var desc pulsar.LongDescription + desc.CommandUsedFor = "This command is used for creating a failure domain of the ." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + create := pulsar.Example{ + Desc: "creating the failure domain", + Command: "pulsarctl clusters create-failure-domain --domain-name ", + } + examples = append(examples, create) + + createWithBrokers := pulsar.Example{ + Desc: "creteing the failure domain with brokers", + Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", + } + examples = append(examples, createWithBrokers) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normao output", + Out: "Create failure domain for cluster succeed", + } + out = append(out, successOut) + out = append(out, argsError) + out = append(out, clusterNonExist) + desc.CommandOutput = out + + vc.SetDescription( + "create-failure-domain", + "Create a failure domain", + desc.ToString(), + "cfd") + + var failureDomainData pulsar.FailureDomainData + + vc.SetRunFuncWithNameArg(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }) + + vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) { + set.StringVar( + &failureDomainData.DomainName, + "domain-name", + "", + "The failure domain name") + set.StringArrayVarP( + &failureDomainData.BrokerList, + "broker-list", + "b", + []string{""}, + "Set the failure domain clusters") + }) +} + +func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error { + failureDomain.ClusterName = vc.NameArg + admin := cmdutils.NewPulsarClient() + err := admin.Clusters().CreateFailureDomain(*failureDomain) + if err == nil { + vc.Command.Printf( + "Create failure domain [%s] for cluster [%s] succeed\n", + failureDomain.DomainName, failureDomain.ClusterName) + } + return err +} diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go new file mode 100644 index 00000000..8b5a9745 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -0,0 +1,12 @@ +package cluster + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCreateFailureDomainCmdSuccess(t *testing.T) { + args := []string{"cfd", "--domain-name", "test-domain", "standalone"} + _, err := TestClusterCommands(createFailureDomainCmd, args) + assert.Nil(t, err) +} diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 8cff24b2..0d87a29d 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -10,6 +10,7 @@ type Clusters interface { Update(ClusterData) error UpdatePeerClusters(string, []string) error GetPeerClusters(string) ([]string, error) + CreateFailureDomain(FailureDomainData) error } type clusters struct { @@ -62,3 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } +func (c *clusters) CreateFailureDomain(data FailureDomainData) error { + endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) + return c.client.post(endpoint, &data, nil) +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index 029d5382..760c391f 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -50,3 +50,10 @@ type FunctionData struct { FuncConf *FunctionConfig `json:"-"` UserCodeFile string `json:"-"` } + +// Failure Domain information +type FailureDomainData struct { + ClusterName string `json:"-"` + DomainName string `json:"-"` + BrokerList []string `json:"brokerList"` +} From 63669051258ddd49c66477b1807b7ea693c02d6f Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Mon, 2 Sep 2019 12:07:03 +0800 Subject: [PATCH 12/17] Fix typo --- pkg/ctl/cluster/create_failure_domain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 5294f7c5..8c9d8e0d 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -19,7 +19,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "creteing the failure domain with brokers", + Desc: "creating the failure domain with brokers", Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", } examples = append(examples, createWithBrokers) @@ -27,7 +27,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: "normao output", + Desc: "normal output", Out: "Create failure domain for cluster succeed", } out = append(out, successOut) From 562ef27ed08b5fbfed525b47c8b9ffd75e59451e Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 10:45:28 +0800 Subject: [PATCH 13/17] Add an empty line --- pkg/pulsar/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 0d87a29d..052bd820 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -63,6 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } +  func (c *clusters) CreateFailureDomain(data FailureDomainData) error { endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) return c.client.post(endpoint, &data, nil) From 851cbf8dd5f477cb36d2512eeefdde7e16306557 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 10:49:19 +0800 Subject: [PATCH 14/17] Fix build --- pkg/pulsar/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 052bd820..a98b8cf3 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -63,7 +63,7 @@ func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) err endpoint := c.client.endpoint(c.basePath, cluster, "peers") return c.client.post(endpoint, peerClusters, nil) } -  + func (c *clusters) CreateFailureDomain(data FailureDomainData) error { endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) return c.client.post(endpoint, &data, nil) From 3d4ccb614dd5ec38b20d1e556769b846332fb70b Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 14:26:18 +0800 Subject: [PATCH 15/17] Address comments --- go.mod | 2 +- pkg/cmdutils/cmdutils.go | 20 +++++++--- pkg/cmdutils/verb.go | 22 +++++++---- pkg/ctl/cluster/create_failure_domain.go | 39 +++++++++++++------ pkg/ctl/cluster/create_failure_domain_test.go | 2 +- 5 files changed, 58 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index c72d83b7..95083551 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect github.com/olekukonko/tablewriter v0.0.1 - github.com/pkg/errors v0.8.1 // indirect + github.com/pkg/errors v0.8.1 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.3 github.com/stretchr/objx v0.2.0 // indirect diff --git a/pkg/cmdutils/cmdutils.go b/pkg/cmdutils/cmdutils.go index 82c7feb8..643ae23a 100644 --- a/pkg/cmdutils/cmdutils.go +++ b/pkg/cmdutils/cmdutils.go @@ -18,10 +18,10 @@ const IncompatibleFlags = "cannot be used at the same time" // NewVerbCmd defines a standard resource command func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command { return &cobra.Command{ - Use: use, - Short: short, - Long: long, - Aliases: aliases, + Use: use, + Short: short, + Long: long, + Aliases: aliases, Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -48,6 +48,15 @@ func GetNameArg(args []string) string { return "" } +func GetNameArgs(args []string, check func(args []string) error) []string { + err := check(args) + if err != nil { + logger.Critical(err.Error()) + CheckNameArgError(err) + } + return args +} + func NewPulsarClient() pulsar.Client { return PulsarCtlConfig.Client(pulsar.V2) } @@ -65,7 +74,6 @@ func PrintJson(w io.Writer, obj interface{}) { fmt.Fprintln(w, string(b)) } - func PrintError(w io.Writer, err error) { msg := err.Error() if pulsar.IsAdminError(err) { @@ -73,4 +81,4 @@ func PrintError(w io.Writer, err error) { msg = ae.Reason } fmt.Fprintln(w, "error:", msg) -} \ No newline at end of file +} diff --git a/pkg/cmdutils/verb.go b/pkg/cmdutils/verb.go index 21b8aab7..567439d2 100644 --- a/pkg/cmdutils/verb.go +++ b/pkg/cmdutils/verb.go @@ -8,15 +8,16 @@ import ( // VerbCmd holds attributes that most of the commands use type VerbCmd struct { - Command *cobra.Command - FlagSetGroup *NamedFlagSetGroup - NameArg string + Command *cobra.Command + FlagSetGroup *NamedFlagSetGroup + NameArg string + NameArgs []string } // AddVerbCmd create a registers a new command under the given resource command func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, newVerbCmd func(*VerbCmd)) { - verb := &VerbCmd { - Command: &cobra.Command{}, + verb := &VerbCmd{ + Command: &cobra.Command{}, } verb.FlagSetGroup = flagGrouping.New(verb.Command) newVerbCmd(verb) @@ -28,7 +29,7 @@ func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, ne func (vc *VerbCmd) SetDescription(use, short, long string, aliases ...string) { vc.Command.Use = use vc.Command.Short = short - vc.Command.Long = long + vc.Command.Long = long vc.Command.Aliases = aliases } @@ -47,7 +48,14 @@ func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) { } } -var ExecErrorHandler = defaultExecErrorHandler +func (vc *VerbCmd) SetRunFuncWithNameArgs(cmd func() error, checkArgs func(args []string) error) { + vc.Command.Run = func(_ *cobra.Command, args []string) { + vc.NameArgs = GetNameArgs(args, checkArgs) + run(cmd) + } +} + +var ExecErrorHandler = defaultExecErrorHandler var defaultExecErrorHandler = func(err error) { logger.Critical("%s\n", err.Error()) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 8c9d8e0d..31885c36 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -1,6 +1,7 @@ package cluster import ( + "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" @@ -13,14 +14,15 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { var examples []pulsar.Example create := pulsar.Example{ - Desc: "creating the failure domain", - Command: "pulsarctl clusters create-failure-domain --domain-name ", + Desc: "create the failure domain", + Command: "pulsarctl clusters create-failure-domain ", } examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "creating the failure domain with brokers", - Command: "pulsarctl clusters create-failure-domain --domain-name --broker-list , ", + Desc: "create the failure domain with brokers", + Command: "pulsarctl clusters create-failure-domain" + + " --broker-list --broker-list ", } examples = append(examples, createWithBrokers) desc.CommandExamples = examples @@ -31,7 +33,12 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { Out: "Create failure domain for cluster succeed", } out = append(out, successOut) - out = append(out, argsError) + + argsErrorOut := pulsar.Output{ + Desc:"the args need to be specified as ", + Out: "[✖] need specified two names for cluster and failure domain", + } + out =append(out, argsErrorOut) out = append(out, clusterNonExist) desc.CommandOutput = out @@ -47,13 +54,19 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { return doCreateFailureDomain(vc, &failureDomainData) }) + checkArgs := func(args []string) error { + if len(args) != 2 { + return errors.New("need to specified two names for cluster and failure domain") + } + return nil + } + + vc.SetRunFuncWithNameArgs(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }, checkArgs) + vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) { - set.StringVar( - &failureDomainData.DomainName, - "domain-name", - "", - "The failure domain name") - set.StringArrayVarP( + set.StringSliceVarP( &failureDomainData.BrokerList, "broker-list", "b", @@ -63,7 +76,9 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { } func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error { - failureDomain.ClusterName = vc.NameArg + failureDomain.ClusterName = vc.NameArgs[0] + failureDomain.DomainName = vc.NameArgs[1] + admin := cmdutils.NewPulsarClient() err := admin.Clusters().CreateFailureDomain(*failureDomain) if err == nil { diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go index 8b5a9745..a9d49da5 100644 --- a/pkg/ctl/cluster/create_failure_domain_test.go +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -6,7 +6,7 @@ import ( ) func TestCreateFailureDomainCmdSuccess(t *testing.T) { - args := []string{"cfd", "--domain-name", "test-domain", "standalone"} + args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"} _, err := TestClusterCommands(createFailureDomainCmd, args) assert.Nil(t, err) } From a0d049a396677b022a5697af5400cd629215ed5c Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 14:29:18 +0800 Subject: [PATCH 16/17] format code --- pkg/ctl/cluster/create_failure_domain.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 31885c36..400e2d11 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -20,7 +20,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { examples = append(examples, create) createWithBrokers := pulsar.Example{ - Desc: "create the failure domain with brokers", + Desc: "create the failure domain with brokers", Command: "pulsarctl clusters create-failure-domain" + " --broker-list --broker-list ", } @@ -35,10 +35,10 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { out = append(out, successOut) argsErrorOut := pulsar.Output{ - Desc:"the args need to be specified as ", - Out: "[✖] need specified two names for cluster and failure domain", + Desc: "the args need to be specified as ", + Out: "[✖] need specified two names for cluster and failure domain", } - out =append(out, argsErrorOut) + out = append(out, argsErrorOut) out = append(out, clusterNonExist) desc.CommandOutput = out From a38ce8648d3a89d01269acb5bfca5be8bb3b79d9 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Tue, 3 Sep 2019 23:17:58 +0800 Subject: [PATCH 17/17] address comments --- pkg/ctl/cluster/create_failure_domain.go | 8 ++++++-- pkg/ctl/cluster/create_failure_domain_test.go | 12 ++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go index 400e2d11..8e4f7f14 100644 --- a/pkg/ctl/cluster/create_failure_domain.go +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -1,7 +1,7 @@ package cluster import ( - "github.com/pkg/errors" + "errors" "github.com/spf13/pflag" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar" @@ -70,7 +70,7 @@ func createFailureDomainCmd(vc *cmdutils.VerbCmd) { &failureDomainData.BrokerList, "broker-list", "b", - []string{""}, + nil, "Set the failure domain clusters") }) } @@ -79,6 +79,10 @@ func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDo failureDomain.ClusterName = vc.NameArgs[0] failureDomain.DomainName = vc.NameArgs[1] + if len(failureDomain.BrokerList) == 0 || failureDomain.BrokerList == nil { + return errors.New("broker list must be specified") + } + admin := cmdutils.NewPulsarClient() err := admin.Clusters().CreateFailureDomain(*failureDomain) if err == nil { diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go index a9d49da5..e35c63f0 100644 --- a/pkg/ctl/cluster/create_failure_domain_test.go +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -6,7 +6,15 @@ import ( ) func TestCreateFailureDomainCmdSuccess(t *testing.T) { - args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"} - _, err := TestClusterCommands(createFailureDomainCmd, args) + args := []string{"create-failure-domain", "-b", "cluster-A", "standalone", "standalone-failure-domain"} + _, execErr, NameErr, err := TestClusterCommands(createFailureDomainCmd, args) + assert.Nil(t, execErr) + assert.Nil(t, NameErr) assert.Nil(t, err) } + +func TestCreateFailureDomainCmdBrokerListError(t *testing.T) { + args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"} + _, execErr, _, _ := TestClusterCommands(createFailureDomainCmd, args) + assert.Equal(t, "broker list must be specified", execErr.Error()) +}