From 9c3e7aa60b2f106e2c068574f3576c7c65b8aeb6 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 14:09:06 +0800 Subject: [PATCH 1/7] Add delete cmd for Pulsar Functions Signed-off-by: xiaolong.ran --- pkg/ctl/functions/delete.go | 125 +++++++++++++++++++++++++++++++ pkg/ctl/functions/delete_test.go | 73 ++++++++++++++++++ pkg/ctl/functions/functions.go | 1 + pkg/pulsar/functions.go | 18 ++--- 4 files changed, 207 insertions(+), 10 deletions(-) create mode 100644 pkg/ctl/functions/delete.go create mode 100644 pkg/ctl/functions/delete_test.go diff --git a/pkg/ctl/functions/delete.go b/pkg/ctl/functions/delete.go new file mode 100644 index 00000000..64f4f33c --- /dev/null +++ b/pkg/ctl/functions/delete.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for delete a Pulsar Function that is running on a Pulsar cluster." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + del := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster", + Command: "pulsarctl functions create \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, del) + + delWithInstanceID := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with instance ID", + Command: "pulsarctl functions create \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, delWithInstanceID) + + delWithFqfn := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with FQFN", + Command: "pulsarctl functions delete \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, delWithFqfn) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Deleted successfully", + } + + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "delete", + "", + desc.ToString(), + "delete", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doDeleteFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + }) +} + +func doDeleteFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + err = admin.Functions().DeleteFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Deleted successfully") + return nil +} diff --git a/pkg/ctl/functions/delete_test.go b/pkg/ctl/functions/delete_test.go new file mode 100644 index 00000000..cdf58530 --- /dev/null +++ b/pkg/ctl/functions/delete_test.go @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestDeleteFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + deleteArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete", + } + + _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + deleteArgsFqfn := []string{"delete", + "--fqfn", "public/default/test-functions-delete-fqfn", + } + + _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn) + assert.Nil(t, err) +} diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index e92891f5..9bdb8e07 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -33,6 +33,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd) return resourceCmd } diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index d5352039..a2d5555a 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -49,6 +49,8 @@ type Functions interface { // Stop function instance StopFunctionWithID(tenant, namespace, name string, instanceID int) error + + DeleteFunction(tenant, namespace, name string) error } type functions struct { @@ -183,21 +185,17 @@ func (f *functions) CreateFuncWithUrl(funcConf *FunctionConfig, pkgUrl string) e func (f *functions) StopFunction(tenant, namespace, name string) error { endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) - err := f.client.post(endpoint+"/stop", "", nil) - if err != nil { - return err - } - return nil + return f.client.post(endpoint+"/stop", "", nil) } func (f *functions) StopFunctionWithID(tenant, namespace, name string, instanceID int) error { id := fmt.Sprintf("%d", instanceID) endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) - err := f.client.post(endpoint+"/stop", "", nil) - if err != nil { - return err - } + return f.client.post(endpoint+"/stop", "", nil) +} - return nil +func (f *functions) DeleteFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.delete(endpoint, nil) } From 7cdab3f5f18805544a36e7f8fd6a619a33c02390 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 14:55:37 +0800 Subject: [PATCH 2/7] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/functions/delete.go | 18 ++++++++++--- pkg/ctl/functions/delete_test.go | 34 ++++++++++++++++++++++++ pkg/ctl/functions/stop.go | 21 +++++++++++++-- pkg/ctl/functions/stop_test.go | 44 ++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 6 deletions(-) diff --git a/pkg/ctl/functions/delete.go b/pkg/ctl/functions/delete.go index 64f4f33c..b8be316b 100644 --- a/pkg/ctl/functions/delete.go +++ b/pkg/ctl/functions/delete.go @@ -32,7 +32,7 @@ func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { del := pulsar.Example{ Desc: "Delete a Pulsar Function that is running on a Pulsar cluster", - Command: "pulsarctl functions create \n" + + Command: "pulsarctl functions delete \n" + "\t--tenant public\n" + "\t--namespace default\n" + "\t--name ", @@ -41,7 +41,7 @@ func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { delWithInstanceID := pulsar.Example{ Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with instance ID", - Command: "pulsarctl functions create \n" + + Command: "pulsarctl functions delete \n" + "\t--tenant public\n" + "\t--namespace default\n" + "\t--name \n" + @@ -59,11 +59,21 @@ func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: "normal output", + Desc: " normal output", Out: "Deleted successfully", } - out = append(out, successOut) + failOut := pulsar.Output{ + Desc: " You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: " The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) desc.CommandOutput = out vc.SetDescription( diff --git a/pkg/ctl/functions/delete_test.go b/pkg/ctl/functions/delete_test.go index cdf58530..b6fc756e 100644 --- a/pkg/ctl/functions/delete_test.go +++ b/pkg/ctl/functions/delete_test.go @@ -71,3 +71,37 @@ func TestDeleteFunctions(t *testing.T) { _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn) assert.Nil(t, err) } + +func TestDeleteFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + failureDeleteArgs := []string{"delete", + "--name", "not-exist", + } + + _, err = TestFunctionsCommands(createFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, err) + + notExistNameOrFqfnArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + } + _, err = TestFunctionsCommands(createFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) +} diff --git a/pkg/ctl/functions/stop.go b/pkg/ctl/functions/stop.go index 1844e67c..63cdb54d 100644 --- a/pkg/ctl/functions/stop.go +++ b/pkg/ctl/functions/stop.go @@ -60,11 +60,28 @@ func stopFunctionsCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: "normal output", + Desc: " normal output", Out: "Stopped successfully", } - out = append(out, successOut) + failOut := pulsar.Output{ + Desc: " You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: " The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + //[✖] code: 400 reason: Operation not permitted + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: " Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) desc.CommandOutput = out vc.SetDescription( diff --git a/pkg/ctl/functions/stop_test.go b/pkg/ctl/functions/stop_test.go index f574cc42..2e7eced5 100644 --- a/pkg/ctl/functions/stop_test.go +++ b/pkg/ctl/functions/stop_test.go @@ -71,3 +71,47 @@ func TestStopFunctions(t *testing.T) { _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) assert.Nil(t, err) } + +func TestStopFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + failureDeleteArgs := []string{"stop", + "--name", "not-exist", + } + + _, err = TestFunctionsCommands(createFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, err) + + notExistNameOrFqfnArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + } + _, err = TestFunctionsCommands(createFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + + notExistInstanceIDArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-failure", + "--instance-id", "12345678", + } + + _, err = TestFunctionsCommands(createFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) +} From a568e78778b0ea7333e674a5d97fb8f3f51862ba Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 17:16:19 +0800 Subject: [PATCH 3/7] fix comments Signed-off-by: xiaolong.ran --- pkg/ctl/cluster/test.go | 24 ++++++------- pkg/ctl/functions/create_test.go | 6 ++-- pkg/ctl/functions/delete.go | 20 +++++------ pkg/ctl/functions/delete_test.go | 26 ++++++++------ pkg/ctl/functions/stop.go | 14 ++++---- pkg/ctl/functions/stop_test.go | 62 ++++++++++++++++++-------------- pkg/ctl/functions/test_help.go | 9 +++-- 7 files changed, 88 insertions(+), 73 deletions(-) diff --git a/pkg/ctl/cluster/test.go b/pkg/ctl/cluster/test.go index cd0537e6..b308c1b9 100644 --- a/pkg/ctl/cluster/test.go +++ b/pkg/ctl/cluster/test.go @@ -8,8 +8,7 @@ import ( "os" ) -func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { - +func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { var execError error cmdutils.ExecErrorHandler = func(err error) { execError = err @@ -20,9 +19,9 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou nameError = err } - var rootCmd = &cobra.Command { - Use: "pulsarctl [command]", - Short: "a CLI for Apache Pulsar", + var rootCmd = &cobra.Command{ + Use: "pulsarctl [command]", + Short: "a CLI for Apache Pulsar", Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -34,7 +33,6 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou rootCmd.SetOut(buf) rootCmd.SetArgs(append([]string{"clusters"}, args...)) - resourceCmd := cmdutils.NewResourceCmd( "clusters", "Operations about cluster(s)", @@ -53,10 +51,10 @@ var ( basePath string ) -func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes.Buffer, err error) { - var rootCmd = &cobra.Command { - Use: "pulsarctl [command]", - Short: "a CLI for Apache Pulsar", +func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { + var rootCmd = &cobra.Command{ + Use: "pulsarctl [command]", + Short: "a CLI for Apache Pulsar", Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -84,9 +82,9 @@ func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes. baseArgs := []string{ "--auth-params", - "{\"tlsCertFile\":\""+basePath+"/test/auth/certs/client-cert.pem\""+ - ",\"tlsKeyFile\":\""+basePath+"/test/auth/certs/client-key.pem\"}", - "--tls-trust-cert-pat", basePath+"/test/auth/certs/cacert.pem", + "{\"tlsCertFile\":\"" + basePath + "/test/auth/certs/client-cert.pem\"" + + ",\"tlsKeyFile\":\"" + basePath + "/test/auth/certs/client-key.pem\"}", + "--tls-trust-cert-pat", basePath + "/test/auth/certs/cacert.pem", "--admin-service-url", "https://localhost:8443", "--tls-allow-insecure"} diff --git a/pkg/ctl/functions/create_test.go b/pkg/ctl/functions/create_test.go index d7bb6509..43f8f0cb 100644 --- a/pkg/ctl/functions/create_test.go +++ b/pkg/ctl/functions/create_test.go @@ -55,7 +55,7 @@ func TestCreateFunctions(t *testing.T) { "--processing-guarantees", "EFFECTIVELY_ONCE", } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) // $ bin/pulsar-admin functions create @@ -66,7 +66,7 @@ func TestCreateFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf) + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf) assert.Nil(t, err) argsWithFileUrl := []string{"create", @@ -80,6 +80,6 @@ func TestCreateFunctions(t *testing.T) { "--processing-guarantees", "EFFECTIVELY_ONCE", } - _, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl) + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl) assert.Nil(t, err) } diff --git a/pkg/ctl/functions/delete.go b/pkg/ctl/functions/delete.go index b8be316b..06f1d38b 100644 --- a/pkg/ctl/functions/delete.go +++ b/pkg/ctl/functions/delete.go @@ -59,26 +59,26 @@ func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: " normal output", + Desc: "normal output", Out: "Deleted successfully", } - failOut := pulsar.Output{ - Desc: " You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", - Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", - } + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } - failOutWithNameNotExist := pulsar.Output{ - Desc: " The name of Pulsar Functions doesn't exist, please check the --name args", - Out: "[✖] code: 404 reason: Function doesn't exist", - } + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } out = append(out, successOut, failOut, failOutWithNameNotExist) desc.CommandOutput = out vc.SetDescription( "delete", - "", + "Delete a Pulsar Function that is running on a Pulsar cluster", desc.ToString(), "delete", ) diff --git a/pkg/ctl/functions/delete_test.go b/pkg/ctl/functions/delete_test.go index b6fc756e..6b06f220 100644 --- a/pkg/ctl/functions/delete_test.go +++ b/pkg/ctl/functions/delete_test.go @@ -20,6 +20,7 @@ package functions import ( "github.com/stretchr/testify/assert" "os" + "strings" "testing" ) @@ -39,7 +40,7 @@ func TestDeleteFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) deleteArgs := []string{"delete", @@ -48,7 +49,7 @@ func TestDeleteFunctions(t *testing.T) { "--name", "test-functions-delete", } - _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) assert.Nil(t, err) argsFqfn := []string{"create", @@ -61,18 +62,18 @@ func TestDeleteFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) assert.Nil(t, err) deleteArgsFqfn := []string{"delete", "--fqfn", "public/default/test-functions-delete-fqfn", } - _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn) + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn) assert.Nil(t, err) } -func TestDeleteFunctionsWithFailure(t *testing.T) { +func TestDeleteFunctionsWithFailure(t *testing.T) { jarName := "dummyExample.jar" _, err := os.Create(jarName) assert.Nil(t, err) @@ -88,20 +89,25 @@ func TestDeleteFunctionsWithFailure(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) failureDeleteArgs := []string{"delete", "--name", "not-exist", } - _, err = TestFunctionsCommands(createFunctionsCmd, failureDeleteArgs) - assert.NotNil(t, err) + _, execErrMsg, _ := TestFunctionsCommands(deleteFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(execErrMsg.Error(), exceptMsg)) notExistNameOrFqfnArgs := []string{"delete", "--tenant", "public", "--namespace", "default", } - _, err = TestFunctionsCommands(createFunctionsCmd, notExistNameOrFqfnArgs) - assert.NotNil(t, err) + _, execErrMsg, _ = TestFunctionsCommands(deleteFunctionsCmd, notExistNameOrFqfnArgs) + failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.NotNil(t, execErrMsg) + assert.True(t, strings.ContainsAny(execErrMsg.Error(), failMsg)) } diff --git a/pkg/ctl/functions/stop.go b/pkg/ctl/functions/stop.go index 63cdb54d..e1184f4c 100644 --- a/pkg/ctl/functions/stop.go +++ b/pkg/ctl/functions/stop.go @@ -60,24 +60,22 @@ func stopFunctionsCmd(vc *cmdutils.VerbCmd) { var out []pulsar.Output successOut := pulsar.Output{ - Desc: " normal output", + Desc: "normal output", Out: "Stopped successfully", } failOut := pulsar.Output{ - Desc: " You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", - Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", } failOutWithNameNotExist := pulsar.Output{ - Desc: " The name of Pulsar Functions doesn't exist, please check the --name args", + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", Out: "[✖] code: 404 reason: Function doesn't exist", } - //[✖] code: 400 reason: Operation not permitted - failOutWithWrongInstanceID := pulsar.Output{ - Desc: " Used an instanceID that does not exist or other impermissible actions", + Desc: "Used an instanceID that does not exist or other impermissible actions", Out: "[✖] code: 400 reason: Operation not permitted", } @@ -86,7 +84,7 @@ func stopFunctionsCmd(vc *cmdutils.VerbCmd) { vc.SetDescription( "stop", - "", + "Stops function instance", desc.ToString(), "stop", ) diff --git a/pkg/ctl/functions/stop_test.go b/pkg/ctl/functions/stop_test.go index 2e7eced5..908160fd 100644 --- a/pkg/ctl/functions/stop_test.go +++ b/pkg/ctl/functions/stop_test.go @@ -20,6 +20,7 @@ package functions import ( "github.com/stretchr/testify/assert" "os" + "strings" "testing" ) @@ -39,7 +40,7 @@ func TestStopFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) stopArgs := []string{"stop", @@ -48,28 +49,28 @@ func TestStopFunctions(t *testing.T) { "--name", "test-functions-stop", } - _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) assert.Nil(t, err) - argsFqfn := []string{"create", - "--tenant", "public", - "--namespace", "default", - "--name", "test-functions-stop-fqfn", - "--inputs", "test-input-topic", - "--output", "persistent://public/default/test-output-topic", - "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", - "--jar", jarName, - } - - _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) - assert.Nil(t, err) - - stopArgsFqfn := []string{"stop", - "--fqfn", "public/default/test-functions-stop-fqfn", - } - - _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) - assert.Nil(t, err) + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-stop-fqfn", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) } func TestStopFunctionsWithFailure(t *testing.T) { @@ -88,30 +89,37 @@ func TestStopFunctionsWithFailure(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) + // test the function name not exist failureDeleteArgs := []string{"stop", "--name", "not-exist", } - - _, err = TestFunctionsCommands(createFunctionsCmd, failureDeleteArgs) + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, failureDeleteArgs) assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + // test the --name args not exist notExistNameOrFqfnArgs := []string{"stop", "--tenant", "public", "--namespace", "default", } - _, err = TestFunctionsCommands(createFunctionsCmd, notExistNameOrFqfnArgs) + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, notExistNameOrFqfnArgs) assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + // test the instance id not exist notExistInstanceIDArgs := []string{"stop", "--tenant", "public", "--namespace", "default", "--name", "test-functions-stop-failure", "--instance-id", "12345678", } - - _, err = TestFunctionsCommands(createFunctionsCmd, notExistInstanceIDArgs) + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, notExistInstanceIDArgs) assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) } diff --git a/pkg/ctl/functions/test_help.go b/pkg/ctl/functions/test_help.go index bafc2aba..b1ba4318 100644 --- a/pkg/ctl/functions/test_help.go +++ b/pkg/ctl/functions/test_help.go @@ -25,7 +25,7 @@ import ( "os" ) -func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { +func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, err error) { var rootCmd = &cobra.Command{ Use: "pulsarctl [command]", Short: "a CLI for Apache Pulsar", @@ -36,6 +36,11 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( }, } + var execError error + cmdutils.ExecErrorHandler = func(err error) { + execError = err + } + buf := new(bytes.Buffer) rootCmd.SetOut(buf) rootCmd.SetArgs(append([]string{"functions"}, args...)) @@ -50,7 +55,7 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( rootCmd.AddCommand(resourceCmd) err = rootCmd.Execute() - return buf, err + return buf, execError, err } var ( From c14b080a9d0d5a8caabf1f07e01d8b550412ffad Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 17:51:33 +0800 Subject: [PATCH 4/7] Add start cmd for Pulsar Functions Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 1 + pkg/ctl/functions/start.go | 161 ++++++++++++++++++++++++++++++++ pkg/ctl/functions/start_test.go | 129 +++++++++++++++++++++++++ pkg/pulsar/functions.go | 20 ++++ 4 files changed, 311 insertions(+) create mode 100644 pkg/ctl/functions/start.go create mode 100644 pkg/ctl/functions/start_test.go diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 9bdb8e07..1aa05e48 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -34,6 +34,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/start.go b/pkg/ctl/functions/start.go new file mode 100644 index 00000000..ca865305 --- /dev/null +++ b/pkg/ctl/functions/start.go @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func startFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for starting a stopped function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + start := pulsar.Example{ + Desc: "Starts a stopped function instance", + Command: "pulsarctl functions start \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, start) + + startWithInstanceID := pulsar.Example{ + Desc: "Starts a stopped function instance with instance ID", + Command: "pulsarctl functions start \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, startWithInstanceID) + + startWithFQFN := pulsar.Example{ + Desc: "Starts a stopped function instance with FQFN", + Command: "pulsarctl functions start \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, startWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Started successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "start", + "Starts a stopped function instance", + desc.ToString(), + "start", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStartFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (start all instances if instance-id is not provided)") + }) +} + +func doStartFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().StartFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Started %s successfully", funcData.FuncName) + } else { + err = admin.Functions().StartFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Started %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/start_test.go b/pkg/ctl/functions/start_test.go new file mode 100644 index 00000000..38100a48 --- /dev/null +++ b/pkg/ctl/functions/start_test.go @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestStartFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + stopArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + assert.Nil(t, err) + + startArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + } + + _, _, err = TestFunctionsCommands(startFunctionsCmd, startArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-start-fqfn", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) + + startArgsFqfn := []string{"start", + "--fqfn", "public/default/test-functions-start-fqfn", + } + + _, _, err = TestFunctionsCommands(startFunctionsCmd, startArgsFqfn) + assert.Nil(t, err) + + // test failure cases + + stopArgsFqfnAgain := []string{"stop", + "--fqfn", "public/default/test-functions-start-fqfn", + } + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfnAgain) + assert.Nil(t, err) + + // test the function name not exist + failureStartArgs := []string{"start", + "--name", "not-exist", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, failureStartArgs) + assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + + // test the --name args not exist + notExistNameOrFqfnArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + + // test the instance id not exist + notExistInstanceIDArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start-fqfn", + "--instance-id", "12345678", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index a2d5555a..71815ca0 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -50,7 +50,14 @@ type Functions interface { // Stop function instance StopFunctionWithID(tenant, namespace, name string, instanceID int) error + // Delete an existing function DeleteFunction(tenant, namespace, name string) error + + // Start all function instances + StartFunction(tenant, namespace, name string) error + + // Start function instance + StartFunctionWithID(tenant, namespace, name string, instanceID int) error } type functions struct { @@ -199,3 +206,16 @@ func (f *functions) DeleteFunction(tenant, namespace, name string) error { endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) return f.client.delete(endpoint, nil) } + +func (f *functions) StartFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.post(endpoint+"/start", "", nil) +} + +func (f *functions) StartFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + return f.client.post(endpoint+"/start", "", nil) +} + From 3497ec8f64b434f21b63595ebac2a2299c46852a Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 18:14:46 +0800 Subject: [PATCH 5/7] Add restart cmd for Pulsar Functions Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 1 + pkg/ctl/functions/restart.go | 160 ++++++++++++++++++++++++++++++ pkg/ctl/functions/restart_test.go | 125 +++++++++++++++++++++++ pkg/pulsar/functions.go | 18 ++++ 4 files changed, 304 insertions(+) create mode 100644 pkg/ctl/functions/restart.go create mode 100644 pkg/ctl/functions/restart_test.go diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 1aa05e48..a7ccb832 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -35,6 +35,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/restart.go b/pkg/ctl/functions/restart.go new file mode 100644 index 00000000..6da769bd --- /dev/null +++ b/pkg/ctl/functions/restart.go @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func restartFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for restarting function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + restart := pulsar.Example{ + Desc: "Restart function instance", + Command: "pulsarctl functions restart \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, restart) + + restartWithInstanceID := pulsar.Example{ + Desc: "Restart function instance with instance ID", + Command: "pulsarctl functions restart \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, restartWithInstanceID) + + restartWithFQFN := pulsar.Example{ + Desc: "Restart function instance with FQFN", + Command: "pulsarctl functions restart \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, restartWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Restarted successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "restart", + "Restart function instance", + desc.ToString(), + "restart", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doRestartFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (restart all instances if instance-id is not provided)") + }) +} + +func doRestartFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().RestartFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Restarted %s successfully", funcData.FuncName) + } else { + err = admin.Functions().RestartFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Restarted %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/restart_test.go b/pkg/ctl/functions/restart_test.go new file mode 100644 index 00000000..25f99fdc --- /dev/null +++ b/pkg/ctl/functions/restart_test.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestRestartFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + restartArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart", + } + + _, _, err = TestFunctionsCommands(restartFunctionsCmd, restartArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + restartArgsFqfn := []string{"restart", + "--fqfn", "public/default/test-functions-restart-fqfn", + } + + _, _, err = TestFunctionsCommands(restartFunctionsCmd, restartArgsFqfn) + assert.Nil(t, err) +} + +func TestRestartFunctionsFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + // test the function name not exist + failureDeleteArgs := []string{"restart", + "--name", "not-exist", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + + // test the --name args not exist + notExistNameOrFqfnArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + + // test the instance id not exist + notExistInstanceIDArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-failure", + "--instance-id", "12345678", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index 71815ca0..cffcfe28 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -58,6 +58,12 @@ type Functions interface { // Start function instance StartFunctionWithID(tenant, namespace, name string, instanceID int) error + + // Restart all function instances + RestartFunction(tenant, namespace, name string) error + + // Restart function instance + RestartFunctionWithID(tenant, namespace, name string, instanceID int) error } type functions struct { @@ -219,3 +225,15 @@ func (f *functions) StartFunctionWithID(tenant, namespace, name string, instance return f.client.post(endpoint+"/start", "", nil) } +func (f *functions) RestartFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.post(endpoint+"/restart", "", nil) +} + +func (f *functions) RestartFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + return f.client.post(endpoint+"/restart", "", nil) +} + From 27e269330f9cd36102ee1df446c45eb28e126f97 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Tue, 3 Sep 2019 20:52:04 +0800 Subject: [PATCH 6/7] Add list cmd for Pulsar Functions Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 1 + pkg/ctl/functions/list.go | 104 +++++++++++++++++++++++++++++++++ pkg/ctl/functions/list_test.go | 67 +++++++++++++++++++++ pkg/ctl/functions/util.go | 7 +++ pkg/pulsar/functions.go | 9 +++ 5 files changed, 188 insertions(+) create mode 100644 pkg/ctl/functions/list.go create mode 100644 pkg/ctl/functions/list_test.go diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index a7ccb832..8be14f89 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -36,6 +36,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/list.go b/pkg/ctl/functions/list.go new file mode 100644 index 00000000..b0090155 --- /dev/null +++ b/pkg/ctl/functions/list.go @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/olekukonko/tablewriter" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func listFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "List all Pulsar Functions running under a specific tenant and namespace." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + list := pulsar.Example{ + Desc: "List all Pulsar Functions running under a specific tenant and namespace", + Command: "pulsarctl functions list \n" + + "\t--tenant public\n" + + "\t--namespace default", + } + examples = append(examples, list) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "+--------------------+\n" + + "| Function Name |\n" + + "+--------------------+\n" + + "| test_function_name |\n" + + "+--------------------+", + } + + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "list", + "List all Pulsar Functions running under a specific tenant and namespace", + desc.ToString(), + "list", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doListFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + }) +} + +func doListFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + processNamespaceCmd(funcData) + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + functions, err := admin.Functions().GetFunctions(funcData.Tenant, funcData.Namespace) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + table := tablewriter.NewWriter(vc.Command.OutOrStdout()) + table.SetHeader([]string{"Pulsar Function Name"}) + + for _, f := range functions { + table.Append([]string{f}) + } + + table.Render() + } + return err +} diff --git a/pkg/ctl/functions/list_test.go b/pkg/ctl/functions/list_test.go new file mode 100644 index 00000000..b04fa60f --- /dev/null +++ b/pkg/ctl/functions/list_test.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestListFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-list", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + listArgs := []string{"list"} + functions, _, err := TestFunctionsCommands(listFunctionsCmd, listArgs) + assert.Nil(t, err) + assert.True(t, strings.Contains(functions.String(), "test-functions-list")) + + deleteArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-list", + } + + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) + assert.Nil(t, nil) + + listArgsAgain := []string{"list", + "--tenant", "public", + "--namespace", "default", + } + out, _, err := TestFunctionsCommands(listFunctionsCmd, listArgsAgain) + assert.Nil(t, err) + assert.False(t, strings.Contains(out.String(), "test-functions-list")) +} diff --git a/pkg/ctl/functions/util.go b/pkg/ctl/functions/util.go index 30756e4b..182c2bca 100644 --- a/pkg/ctl/functions/util.go +++ b/pkg/ctl/functions/util.go @@ -416,3 +416,10 @@ func processBaseArguments(funcData *pulsar.FunctionData) error { return nil } + +func processNamespaceCmd(funcData *pulsar.FunctionData) { + if funcData.Tenant == "" || funcData.Namespace == "" { + funcData.Tenant = PublicTenant + funcData.Namespace = DefaultNamespace + } +} diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index cffcfe28..74b388d0 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -64,6 +64,9 @@ type Functions interface { // Restart function instance RestartFunctionWithID(tenant, namespace, name string, instanceID int) error + + // Get the list of functions + GetFunctions(tenant, namespace string) ([]string, error) } type functions struct { @@ -237,3 +240,9 @@ func (f *functions) RestartFunctionWithID(tenant, namespace, name string, instan return f.client.post(endpoint+"/restart", "", nil) } +func (f *functions) GetFunctions(tenant, namespace string) ([]string, error) { + var functions []string + endpoint := f.client.endpoint(f.basePath, tenant, namespace) + err := f.client.get(endpoint, &functions) + return functions, err +} From ad15c9c3a8548a6a0dcca2e79fbcd03a982d89a2 Mon Sep 17 00:00:00 2001 From: "xiaolong.ran" Date: Wed, 4 Sep 2019 11:58:45 +0800 Subject: [PATCH 7/7] Add get cmd for Pulsar functions Signed-off-by: xiaolong.ran --- pkg/ctl/functions/functions.go | 1 + pkg/ctl/functions/get.go | 141 +++++++++++++++++++++++++++++++++ pkg/ctl/functions/get_test.go | 104 ++++++++++++++++++++++++ pkg/ctl/functions/util.go | 36 ++++----- pkg/pulsar/functionConfg.go | 22 ++--- pkg/pulsar/functions.go | 10 +++ pkg/pulsar/sinkConfig.go | 2 +- pkg/pulsar/sourceConfig.go | 2 +- 8 files changed, 279 insertions(+), 39 deletions(-) create mode 100644 pkg/ctl/functions/get.go create mode 100644 pkg/ctl/functions/get_test.go diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index 8be14f89..7b85519b 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -37,6 +37,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/get.go b/pkg/ctl/functions/get.go new file mode 100644 index 00000000..adccd07b --- /dev/null +++ b/pkg/ctl/functions/get.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func getFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Fetch information about a Pulsar Function" + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + get := pulsar.Example{ + Desc: "Fetch information about a Pulsar Function", + Command: "pulsarctl functions get \n" + + "\t--tenant public\n" + + "\t--namespace default \n" + + "\t--name ", + } + + getWithFqfn := pulsar.Example{ + Desc: "Fetch information about a Pulsar Function with FQFN", + Command: "pulsarctl functions get \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, get, getWithFqfn) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n " + + "\"tenant\": \"public\",\n " + + "\"namespace\": \"default\",\n " + + "\"name\": \"test-functions\",\n " + + "\"className\": \"org.apache.pulsar.functions.api.examples.ExclamationFunction\",\n " + + "\"inputSpecs\": {\n \"persistent://public/default/test-topic-1\": {\n " + + "\"isRegexPattern\": false\n " + + "}\n " + + "},\n " + + "\"output\": \"persistent://public/default/test-topic-2\",\n " + + "\"processingGuarantees\": \"ATLEAST_ONCE\",\n " + + "\"retainOrdering\": false,\n \"userConfig\": {},\n " + + "\"runtime\": \"JAVA\",\n \"autoAck\": true,\n " + + "\"parallelism\": 1,\n \"resources\": {\n " + + "\"cpu\": 1.0,\n \"ram\": 1073741824,\n \"disk\": 10737418240\n },\n " + + "\"cleanupSubscription\": true\n}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "get", + "Fetch information about a Pulsar Function", + desc.ToString(), + "get", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doGetFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + }) +} + +func doGetFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + functionConfig, err := admin.Functions().GetFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionConfig) + } + + return err +} diff --git a/pkg/ctl/functions/get_test.go b/pkg/ctl/functions/get_test.go new file mode 100644 index 00000000..2a7c94b5 --- /dev/null +++ b/pkg/ctl/functions/get_test.go @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestGetFunction(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + getArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get", + } + + out, _, err := TestFunctionsCommands(getFunctionsCmd, getArgs) + assert.Nil(t, err) + + var functionConfig pulsar.FunctionConfig + err = json.Unmarshal(out.Bytes(), &functionConfig) + assert.Nil(t, err) + + assert.Equal(t, functionConfig.Tenant, "public") + assert.Equal(t, functionConfig.Namespace, "default") + assert.Equal(t, functionConfig.Name, "test-functions-get") + assert.Equal(t, functionConfig.Output, "persistent://public/default/test-output-topic") + assert.Equal(t, functionConfig.ClassName, "org.apache.pulsar.functions.api.examples.ExclamationFunction") +} + +func TestGetFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + failureDeleteArgs := []string{"get", + "--name", "not-exist", + } + + _, execErrMsg, _ := TestFunctionsCommands(getFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "Function not-exist doesn't exist" + assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg)) + + notExistNameOrFqfnArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + } + _, execErrMsg, _ = TestFunctionsCommands(getFunctionsCmd, notExistNameOrFqfnArgs) + failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.NotNil(t, execErrMsg) + assert.True(t, strings.Contains(execErrMsg.Error(), failMsg)) +} diff --git a/pkg/ctl/functions/util.go b/pkg/ctl/functions/util.go index 182c2bca..20f4dfb4 100644 --- a/pkg/ctl/functions/util.go +++ b/pkg/ctl/functions/util.go @@ -196,20 +196,6 @@ func processArgs(funcData *pulsar.FunctionData) error { funcData.FuncConf.OutputSchemaType = funcData.SchemaType } - // processingGuarantees default value is 0, means AtLeastOnce. - if funcData.ProcessingGuarantees != "" { - switch funcData.ProcessingGuarantees { - case "ATMOST_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.AtMostOnce - case "EFFECTIVELY_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.EffectivelyOnce - case "ATLEAST_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.AtLeasetOnce - default: - funcData.FuncConf.ProcessingGuarantees = pulsar.AtLeasetOnce - } - } - if funcData.RetainOrdering { funcData.FuncConf.RetainOrdering = funcData.RetainOrdering } @@ -336,13 +322,6 @@ func processArgs(funcData *pulsar.FunctionData) error { } func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { - // go doesn't need className - if functionConfig.Runtime == pulsar.Python || functionConfig.Runtime == pulsar.Java { - if functionConfig.ClassName == "" { - return errors.New("no Function Classname specified") - } - } - if functionConfig.Name == "" { inferMissingFunctionName(functionConfig) } @@ -380,6 +359,21 @@ func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { return errors.New("the specified go file does not exist") } + if functionConfig.Jar != "" { + functionConfig.Runtime = pulsar.JavaRuntime + } else if functionConfig.Py != "" { + functionConfig.Runtime = pulsar.PythonRuntime + } else if functionConfig.Go != "" { + functionConfig.Runtime = pulsar.GoRuntime + } + + // go doesn't need className + if functionConfig.Runtime == pulsar.JavaRuntime || functionConfig.Runtime == pulsar.PythonRuntime { + if functionConfig.ClassName == "" { + return errors.New("no Function Classname specified") + } + } + return nil } diff --git a/pkg/pulsar/functionConfg.go b/pkg/pulsar/functionConfg.go index 01c0ffdf..13eb356e 100644 --- a/pkg/pulsar/functionConfg.go +++ b/pkg/pulsar/functionConfg.go @@ -17,20 +17,10 @@ package pulsar -type ProcessingGuarantees int - -type Runtime int - -const ( - AtLeasetOnce ProcessingGuarantees = iota - AtMostOnce - EffectivelyOnce -) - const ( - Java Runtime = iota - Python - Go + JavaRuntime = "JAVA" + PythonRuntime = "PYTHON" + GoRuntime = "GO" ) type FunctionConfig struct { @@ -58,7 +48,7 @@ type FunctionConfig struct { OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"` LogTopic string `json:"logTopic" yaml:"logTopic"` - ProcessingGuarantees ProcessingGuarantees `json:"processingGuarantees" yaml:"processingGuarantees"` + ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"` RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"` UserConfig map[string]interface{} `json:"userConfig" yaml:"userConfig"` @@ -69,7 +59,7 @@ type FunctionConfig struct { // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} `json:"secrets" yaml:"secrets"` - Runtime Runtime `json:"runtime" yaml:"runtime"` + Runtime string `json:"runtime" yaml:"runtime"` AutoAck bool `json:"autoAck" yaml:"autoAck"` MaxMessageRetries int `json:"maxMessageRetries" yaml:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"` @@ -78,7 +68,7 @@ type FunctionConfig struct { Resources *Resources `json:"resources" yaml:"resources"` FQFN string `json:"fqfn" yaml:"fqfn"` WindowConfig *WindowConfig `json:"windowConfig" yaml:"windowConfig"` - TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` + TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` Jar string `json:"jar" yaml:"jar"` Py string `json:"py" yaml:"py"` Go string `json:"go" yaml:"go"` diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index 74b388d0..089c8c50 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -67,6 +67,9 @@ type Functions interface { // Get the list of functions GetFunctions(tenant, namespace string) ([]string, error) + + // Get the configuration for the specified function + GetFunction(tenant, namespace, name string) (FunctionConfig, error) } type functions struct { @@ -246,3 +249,10 @@ func (f *functions) GetFunctions(tenant, namespace string) ([]string, error) { err := f.client.get(endpoint, &functions) return functions, err } + +func (f *functions) GetFunction(tenant, namespace, name string) (FunctionConfig, error) { + var functionConfig FunctionConfig + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint, &functionConfig) + return functionConfig, err +} diff --git a/pkg/pulsar/sinkConfig.go b/pkg/pulsar/sinkConfig.go index 75dc3cd0..db28bd56 100644 --- a/pkg/pulsar/sinkConfig.go +++ b/pkg/pulsar/sinkConfig.go @@ -38,7 +38,7 @@ type SinkConfig struct { // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} Parallelism int - ProcessingGuarantees ProcessingGuarantees + ProcessingGuarantees string RetainOrdering bool Resources Resources AutoAck bool diff --git a/pkg/pulsar/sourceConfig.go b/pkg/pulsar/sourceConfig.go index b0e80dc1..1a1b263a 100644 --- a/pkg/pulsar/sourceConfig.go +++ b/pkg/pulsar/sourceConfig.go @@ -37,7 +37,7 @@ type SourceConfig struct { Secrets map[string]interface{} Parallelism int - ProcessingGuarantees ProcessingGuarantees + ProcessingGuarantees string Resources Resources Archive string // Any flags that you want to pass to the runtime.