diff --git a/go.mod b/go.mod index c72d83b7..95083551 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect github.com/olekukonko/tablewriter v0.0.1 - github.com/pkg/errors v0.8.1 // indirect + github.com/pkg/errors v0.8.1 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.3 github.com/stretchr/objx v0.2.0 // indirect diff --git a/pkg/cmdutils/cmdutils.go b/pkg/cmdutils/cmdutils.go index 18ebfca5..643ae23a 100644 --- a/pkg/cmdutils/cmdutils.go +++ b/pkg/cmdutils/cmdutils.go @@ -2,6 +2,7 @@ package cmdutils import ( "encoding/json" + "errors" "fmt" "io" "os" @@ -17,10 +18,10 @@ const IncompatibleFlags = "cannot be used at the same time" // NewVerbCmd defines a standard resource command func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command { return &cobra.Command{ - Use: use, - Short: short, - Long: long, - Aliases: aliases, + Use: use, + Short: short, + Long: long, + Aliases: aliases, Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -29,11 +30,17 @@ func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command { } } +var CheckNameArgError = defaultNameArgsError + +var defaultNameArgsError = func(err error) { + os.Exit(1) +} + // GetNameArg tests to ensure there is only 1 name argument func GetNameArg(args []string) string { if len(args) > 1 || len(args) == 0 { logger.Critical("only one argument is allowed to be used as a name") - os.Exit(1) + CheckNameArgError(errors.New("only one argument is allowed to be used as a name")) } if len(args) == 1 { return strings.TrimSpace(args[0]) @@ -41,6 +48,15 @@ func GetNameArg(args []string) string { return "" } +func GetNameArgs(args []string, check func(args []string) error) []string { + err := check(args) + if err != nil { + logger.Critical(err.Error()) + CheckNameArgError(err) + } + return args +} + func NewPulsarClient() pulsar.Client { return PulsarCtlConfig.Client(pulsar.V2) } @@ -58,7 +74,6 @@ func PrintJson(w io.Writer, obj interface{}) { fmt.Fprintln(w, string(b)) } - func PrintError(w io.Writer, err error) { msg := err.Error() if pulsar.IsAdminError(err) { @@ -66,4 +81,4 @@ func PrintError(w io.Writer, err error) { msg = ae.Reason } fmt.Fprintln(w, "error:", msg) -} \ No newline at end of file +} diff --git a/pkg/cmdutils/verb.go b/pkg/cmdutils/verb.go index 0be6d8a9..567439d2 100644 --- a/pkg/cmdutils/verb.go +++ b/pkg/cmdutils/verb.go @@ -8,15 +8,16 @@ import ( // VerbCmd holds attributes that most of the commands use type VerbCmd struct { - Command *cobra.Command - FlagSetGroup *NamedFlagSetGroup - NameArg string + Command *cobra.Command + FlagSetGroup *NamedFlagSetGroup + NameArg string + NameArgs []string } // AddVerbCmd create a registers a new command under the given resource command func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, newVerbCmd func(*VerbCmd)) { - verb := &VerbCmd { - Command: &cobra.Command{}, + verb := &VerbCmd{ + Command: &cobra.Command{}, } verb.FlagSetGroup = flagGrouping.New(verb.Command) newVerbCmd(verb) @@ -28,7 +29,7 @@ func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, ne func (vc *VerbCmd) SetDescription(use, short, long string, aliases ...string) { vc.Command.Use = use vc.Command.Short = short - vc.Command.Long = long + vc.Command.Long = long vc.Command.Aliases = aliases } @@ -47,9 +48,22 @@ func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) { } } +func (vc *VerbCmd) SetRunFuncWithNameArgs(cmd func() error, checkArgs func(args []string) error) { + vc.Command.Run = func(_ *cobra.Command, args []string) { + vc.NameArgs = GetNameArgs(args, checkArgs) + run(cmd) + } +} + +var ExecErrorHandler = defaultExecErrorHandler + +var defaultExecErrorHandler = func(err error) { + logger.Critical("%s\n", err.Error()) + os.Exit(1) +} + func run(cmd func() error) { if err := cmd(); err != nil { - logger.Critical("%s\n", err.Error()) - os.Exit(1) + ExecErrorHandler(err) } } diff --git a/pkg/ctl/cluster/cluster.go b/pkg/ctl/cluster/cluster.go index f57df01a..c8ae3593 100644 --- a/pkg/ctl/cluster/cluster.go +++ b/pkg/ctl/cluster/cluster.go @@ -30,6 +30,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updateClusterCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, updatePeerClustersCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getPeerClustersCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFailureDomainCmd) return resourceCmd } diff --git a/pkg/ctl/cluster/create_failure_domain.go b/pkg/ctl/cluster/create_failure_domain.go new file mode 100644 index 00000000..8e4f7f14 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain.go @@ -0,0 +1,94 @@ +package cluster + +import ( + "errors" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func createFailureDomainCmd(vc *cmdutils.VerbCmd) { + var desc pulsar.LongDescription + desc.CommandUsedFor = "This command is used for creating a failure domain of the ." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + create := pulsar.Example{ + Desc: "create the failure domain", + Command: "pulsarctl clusters create-failure-domain ", + } + examples = append(examples, create) + + createWithBrokers := pulsar.Example{ + Desc: "create the failure domain with brokers", + Command: "pulsarctl clusters create-failure-domain" + + " --broker-list --broker-list ", + } + examples = append(examples, createWithBrokers) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Create failure domain for cluster succeed", + } + out = append(out, successOut) + + argsErrorOut := pulsar.Output{ + Desc: "the args need to be specified as ", + Out: "[✖] need specified two names for cluster and failure domain", + } + out = append(out, argsErrorOut) + out = append(out, clusterNonExist) + desc.CommandOutput = out + + vc.SetDescription( + "create-failure-domain", + "Create a failure domain", + desc.ToString(), + "cfd") + + var failureDomainData pulsar.FailureDomainData + + vc.SetRunFuncWithNameArg(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }) + + checkArgs := func(args []string) error { + if len(args) != 2 { + return errors.New("need to specified two names for cluster and failure domain") + } + return nil + } + + vc.SetRunFuncWithNameArgs(func() error { + return doCreateFailureDomain(vc, &failureDomainData) + }, checkArgs) + + vc.FlagSetGroup.InFlagSet("FailureDomainData", func(set *pflag.FlagSet) { + set.StringSliceVarP( + &failureDomainData.BrokerList, + "broker-list", + "b", + nil, + "Set the failure domain clusters") + }) +} + +func doCreateFailureDomain(vc *cmdutils.VerbCmd, failureDomain *pulsar.FailureDomainData) error { + failureDomain.ClusterName = vc.NameArgs[0] + failureDomain.DomainName = vc.NameArgs[1] + + if len(failureDomain.BrokerList) == 0 || failureDomain.BrokerList == nil { + return errors.New("broker list must be specified") + } + + admin := cmdutils.NewPulsarClient() + err := admin.Clusters().CreateFailureDomain(*failureDomain) + if err == nil { + vc.Command.Printf( + "Create failure domain [%s] for cluster [%s] succeed\n", + failureDomain.DomainName, failureDomain.ClusterName) + } + return err +} diff --git a/pkg/ctl/cluster/create_failure_domain_test.go b/pkg/ctl/cluster/create_failure_domain_test.go new file mode 100644 index 00000000..e35c63f0 --- /dev/null +++ b/pkg/ctl/cluster/create_failure_domain_test.go @@ -0,0 +1,20 @@ +package cluster + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestCreateFailureDomainCmdSuccess(t *testing.T) { + args := []string{"create-failure-domain", "-b", "cluster-A", "standalone", "standalone-failure-domain"} + _, execErr, NameErr, err := TestClusterCommands(createFailureDomainCmd, args) + assert.Nil(t, execErr) + assert.Nil(t, NameErr) + assert.Nil(t, err) +} + +func TestCreateFailureDomainCmdBrokerListError(t *testing.T) { + args := []string{"create-failure-domain", "standalone", "standalone-failure-domain"} + _, execErr, _, _ := TestClusterCommands(createFailureDomainCmd, args) + assert.Equal(t, "broker list must be specified", execErr.Error()) +} diff --git a/pkg/ctl/cluster/delete_test.go b/pkg/ctl/cluster/delete_test.go index 8d13236e..a3688ee7 100644 --- a/pkg/ctl/cluster/delete_test.go +++ b/pkg/ctl/cluster/delete_test.go @@ -8,21 +8,21 @@ import ( func TestDeleteClusterCmd(t *testing.T) { args := []string{"add", "test"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) assert.Nil(t, err) args = []string{"list"} - out, err := TestClusterCommands(listClustersCmd, args) + out, _, _, err := TestClusterCommands(listClustersCmd, args) assert.Nil(t, err) clusters := out.String() assert.True(t, strings.Contains(clusters, "test")) args = []string{"delete", "test"} - _, err = TestClusterCommands(deleteClusterCmd, args) + _, _, _, err = TestClusterCommands(deleteClusterCmd, args) assert.Nil(t, err) args = []string{"list"} - out, err = TestClusterCommands(listClustersCmd, args) + out, _, _, err = TestClusterCommands(listClustersCmd, args) assert.Nil(t, err) clusters = out.String() assert.False(t, strings.Contains(clusters, "test")) diff --git a/pkg/ctl/cluster/get_peer_clusters_test.go b/pkg/ctl/cluster/get_peer_clusters_test.go index d1e9c0aa..d5008cd9 100644 --- a/pkg/ctl/cluster/get_peer_clusters_test.go +++ b/pkg/ctl/cluster/get_peer_clusters_test.go @@ -8,13 +8,13 @@ import ( func TestGetPeerClustersCmd(t *testing.T) { args := []string{"add", "test_get_peer", "--peer-cluster", "standalone"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) if err != nil { t.Fatal(err) } args = []string{"gpc", "test_get_peer"} - out, err := TestClusterCommands(getPeerClustersCmd, args) + out, _, _, err := TestClusterCommands(getPeerClustersCmd, args) if err != nil { t.Fatal(err) } diff --git a/pkg/ctl/cluster/get_test.go b/pkg/ctl/cluster/get_test.go index 761d21e6..72cfb520 100644 --- a/pkg/ctl/cluster/get_test.go +++ b/pkg/ctl/cluster/get_test.go @@ -11,7 +11,7 @@ import ( func TestGetClusterData(t *testing.T) { args := []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) if err != nil { t.Error(err) } diff --git a/pkg/ctl/cluster/test.go b/pkg/ctl/cluster/test.go index 99ba6785..cd0537e6 100644 --- a/pkg/ctl/cluster/test.go +++ b/pkg/ctl/cluster/test.go @@ -8,7 +8,18 @@ import ( "os" ) -func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { +func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { + + var execError error + cmdutils.ExecErrorHandler = func(err error) { + execError = err + } + + var nameError error + cmdutils.CheckNameArgError = func(err error) { + nameError = err + } + var rootCmd = &cobra.Command { Use: "pulsarctl [command]", Short: "a CLI for Apache Pulsar", @@ -34,7 +45,7 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou rootCmd.AddCommand(resourceCmd) err = rootCmd.Execute() - return buf, err + return buf, execError, nameError, err } var ( diff --git a/pkg/ctl/cluster/update_peer_clusters_test.go b/pkg/ctl/cluster/update_peer_clusters_test.go index bc6757d7..1bd81a9b 100644 --- a/pkg/ctl/cluster/update_peer_clusters_test.go +++ b/pkg/ctl/cluster/update_peer_clusters_test.go @@ -9,19 +9,19 @@ import ( func TestUpdatePeerClusters(t *testing.T) { args := []string{"add", "test_peer_cluster"} - _, err := TestClusterCommands(createClusterCmd, args) + _, _, _, err := TestClusterCommands(createClusterCmd, args) if err != nil { t.Fatal(err) } args = []string{"update-peer-clusters", "-p", "test_peer_cluster", "standalone"} - _, err = TestClusterCommands(updatePeerClustersCmd, args) + _, _, _, err = TestClusterCommands(updatePeerClustersCmd, args) if err != nil { t.Fatal(err) } args = []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) var clusterData pulsar.ClusterData err = json.Unmarshal(out.Bytes(), &clusterData) diff --git a/pkg/ctl/cluster/update_test.go b/pkg/ctl/cluster/update_test.go index 6b4fbf8c..89c4208c 100644 --- a/pkg/ctl/cluster/update_test.go +++ b/pkg/ctl/cluster/update_test.go @@ -19,13 +19,13 @@ func TestUpdateCluster(t *testing.T) { "standalone", } - _, err := TestClusterCommands(updateClusterCmd, args) + _, _, _, err := TestClusterCommands(updateClusterCmd, args) if err != nil { t.Error(err) } args = []string{"get", "standalone"} - out, err := TestClusterCommands(getClusterDataCmd, args) + out, _, _, err := TestClusterCommands(getClusterDataCmd, args) var data pulsar.ClusterData err = json.Unmarshal(out.Bytes(), &data) diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index b4b82474..e92891f5 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -32,6 +32,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { ) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/stop.go b/pkg/ctl/functions/stop.go new file mode 100644 index 00000000..1844e67c --- /dev/null +++ b/pkg/ctl/functions/stop.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func stopFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for stopping function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + stop := pulsar.Example{ + Desc: "Stops function instance", + Command: "pulsarctl functions stop \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, stop) + + stopWithInstanceID := pulsar.Example{ + Desc: "Stops function instance with instance ID", + Command: "pulsarctl functions stop \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, stopWithInstanceID) + + stopWithFQFN := pulsar.Example{ + Desc: "Stops function instance with FQFN", + Command: "pulsarctl functions stop \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, stopWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Stopped successfully", + } + + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "stop", + "", + desc.ToString(), + "stop", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStopFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (stop all instances if instance-id is not provided)") + }) +} + +func doStopFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().StopFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Stopped %s successfully", funcData.FuncName) + } else { + err = admin.Functions().StopFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Stopped %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/stop_test.go b/pkg/ctl/functions/stop_test.go new file mode 100644 index 00000000..f574cc42 --- /dev/null +++ b/pkg/ctl/functions/stop_test.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestStopFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + stopArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop", + } + + _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-stop-fqfn", + } + + _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) +} diff --git a/pkg/ctl/functions/util.go b/pkg/ctl/functions/util.go index 910a4cbc..30756e4b 100644 --- a/pkg/ctl/functions/util.go +++ b/pkg/ctl/functions/util.go @@ -382,3 +382,37 @@ func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { return nil } + +func processBaseArguments(funcData *pulsar.FunctionData) error { + usesSetters := funcData.Tenant != "" || funcData.Namespace != "" || funcData.FuncName != "" + usesFqfn := funcData.FQFN != "" + + // return error if --fqfn is set alongside any combination of --tenant, --namespace, and --name + if usesFqfn && usesSetters { + return errors.New("you must specify either a Fully Qualified Function Name (FQFN) or tenant, namespace, and function name") + } else if usesFqfn { + // If the --fqfn flag is used, parse tenant, namespace, and name using that flag + fqfnParts := strings.Split(funcData.FQFN, "/") + if len(fqfnParts) != 3 { + return errors.New("fully qualified function names (FQFNs) must be of the form tenant/namespace/name") + } + + funcData.Tenant = fqfnParts[0] + funcData.Namespace = fqfnParts[1] + funcData.FuncName = fqfnParts[2] + } else { + if funcData.Tenant == "" { + funcData.Tenant = PublicTenant + } + + if funcData.Namespace == "" { + funcData.Namespace = DefaultNamespace + } + + if funcData.FuncName == "" { + return errors.New("you must specify a name for the function or a Fully Qualified Function Name (FQFN)") + } + } + + return nil +} diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go index 9465a9e9..a98b8cf3 100644 --- a/pkg/pulsar/cluster.go +++ b/pkg/pulsar/cluster.go @@ -10,6 +10,7 @@ type Clusters interface { Update(ClusterData) error UpdatePeerClusters(string, []string) error GetPeerClusters(string) ([]string, error) + CreateFailureDomain(FailureDomainData) error } type clusters struct { @@ -51,14 +52,19 @@ func (c *clusters) Update(cdata ClusterData) error { endpoint := c.client.endpoint(c.basePath, cdata.Name) return c.client.post(endpoint, &cdata, nil) } - -func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { - endpoint := c.client.endpoint(c.basePath, cluster, "peers") - return c.client.post(endpoint, peerClusters, nil) -} func (c *clusters) GetPeerClusters(name string) ([]string, error) { var peerClusters []string endpoint := c.client.endpoint(c.basePath, name, "peers") err := c.client.get(endpoint, &peerClusters) return peerClusters, err } + +func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error { + endpoint := c.client.endpoint(c.basePath, cluster, "peers") + return c.client.post(endpoint, peerClusters, nil) +} + +func (c *clusters) CreateFailureDomain(data FailureDomainData) error { + endpoint := c.client.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName) + return c.client.post(endpoint, &data, nil) +} diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go index dbad0508..760c391f 100644 --- a/pkg/pulsar/data.go +++ b/pkg/pulsar/data.go @@ -16,6 +16,7 @@ type FunctionData struct { Tenant string `json:"tenant"` Namespace string `json:"namespace"` FuncName string `json:"functionName"` + InstanceID string `json:"instance_id"` ClassName string `json:"className"` Jar string `json:"jarFile"` Py string `json:"pyFile"` @@ -49,3 +50,10 @@ type FunctionData struct { FuncConf *FunctionConfig `json:"-"` UserCodeFile string `json:"-"` } + +// Failure Domain information +type FailureDomainData struct { + ClusterName string `json:"-"` + DomainName string `json:"-"` + BrokerList []string `json:"brokerList"` +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index db09a619..d5352039 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -43,6 +43,12 @@ type Functions interface { // @param pkgUrl // url from which pkg can be downloaded CreateFuncWithUrl(data *FunctionConfig, pkgUrl string) error + + // Stop all function instances + StopFunction(tenant, namespace, name string) error + + // Stop function instance + StopFunctionWithID(tenant, namespace, name string, instanceID int) error } type functions struct { @@ -174,3 +180,24 @@ func (f *functions) CreateFuncWithUrl(funcConf *FunctionConfig, pkgUrl string) e return nil } + +func (f *functions) StopFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.post(endpoint+"/stop", "", nil) + if err != nil { + return err + } + return nil +} + +func (f *functions) StopFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + err := f.client.post(endpoint+"/stop", "", nil) + if err != nil { + return err + } + + return nil +}