diff --git a/pkg/ctl/cluster/test.go b/pkg/ctl/cluster/test.go index cd0537e6..b308c1b9 100644 --- a/pkg/ctl/cluster/test.go +++ b/pkg/ctl/cluster/test.go @@ -8,8 +8,7 @@ import ( "os" ) -func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { - +func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) { var execError error cmdutils.ExecErrorHandler = func(err error) { execError = err @@ -20,9 +19,9 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou nameError = err } - var rootCmd = &cobra.Command { - Use: "pulsarctl [command]", - Short: "a CLI for Apache Pulsar", + var rootCmd = &cobra.Command{ + Use: "pulsarctl [command]", + Short: "a CLI for Apache Pulsar", Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -34,7 +33,6 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou rootCmd.SetOut(buf) rootCmd.SetArgs(append([]string{"clusters"}, args...)) - resourceCmd := cmdutils.NewResourceCmd( "clusters", "Operations about cluster(s)", @@ -53,10 +51,10 @@ var ( basePath string ) -func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes.Buffer, err error) { - var rootCmd = &cobra.Command { - Use: "pulsarctl [command]", - Short: "a CLI for Apache Pulsar", +func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { + var rootCmd = &cobra.Command{ + Use: "pulsarctl [command]", + Short: "a CLI for Apache Pulsar", Run: func(cmd *cobra.Command, _ []string) { if err := cmd.Help(); err != nil { logger.Debug("ignoring error %q", err.Error()) @@ -84,9 +82,9 @@ func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes. baseArgs := []string{ "--auth-params", - "{\"tlsCertFile\":\""+basePath+"/test/auth/certs/client-cert.pem\""+ - ",\"tlsKeyFile\":\""+basePath+"/test/auth/certs/client-key.pem\"}", - "--tls-trust-cert-pat", basePath+"/test/auth/certs/cacert.pem", + "{\"tlsCertFile\":\"" + basePath + "/test/auth/certs/client-cert.pem\"" + + ",\"tlsKeyFile\":\"" + basePath + "/test/auth/certs/client-key.pem\"}", + "--tls-trust-cert-pat", basePath + "/test/auth/certs/cacert.pem", "--admin-service-url", "https://localhost:8443", "--tls-allow-insecure"} diff --git a/pkg/ctl/functions/create_test.go b/pkg/ctl/functions/create_test.go index d7bb6509..43f8f0cb 100644 --- a/pkg/ctl/functions/create_test.go +++ b/pkg/ctl/functions/create_test.go @@ -55,7 +55,7 @@ func TestCreateFunctions(t *testing.T) { "--processing-guarantees", "EFFECTIVELY_ONCE", } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) // $ bin/pulsar-admin functions create @@ -66,7 +66,7 @@ func TestCreateFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf) + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf) assert.Nil(t, err) argsWithFileUrl := []string{"create", @@ -80,6 +80,6 @@ func TestCreateFunctions(t *testing.T) { "--processing-guarantees", "EFFECTIVELY_ONCE", } - _, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl) + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl) assert.Nil(t, err) } diff --git a/pkg/ctl/functions/delete.go b/pkg/ctl/functions/delete.go new file mode 100644 index 00000000..06f1d38b --- /dev/null +++ b/pkg/ctl/functions/delete.go @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func deleteFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for delete a Pulsar Function that is running on a Pulsar cluster." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + del := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster", + Command: "pulsarctl functions delete \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, del) + + delWithInstanceID := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with instance ID", + Command: "pulsarctl functions delete \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, delWithInstanceID) + + delWithFqfn := pulsar.Example{ + Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with FQFN", + Command: "pulsarctl functions delete \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, delWithFqfn) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Deleted successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "delete", + "Delete a Pulsar Function that is running on a Pulsar cluster", + desc.ToString(), + "delete", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doDeleteFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + }) +} + +func doDeleteFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + err = admin.Functions().DeleteFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Deleted successfully") + return nil +} diff --git a/pkg/ctl/functions/delete_test.go b/pkg/ctl/functions/delete_test.go new file mode 100644 index 00000000..6b06f220 --- /dev/null +++ b/pkg/ctl/functions/delete_test.go @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestDeleteFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + deleteArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete", + } + + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + deleteArgsFqfn := []string{"delete", + "--fqfn", "public/default/test-functions-delete-fqfn", + } + + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn) + assert.Nil(t, err) +} + +func TestDeleteFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-delete-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + + assert.Nil(t, err) + + failureDeleteArgs := []string{"delete", + "--name", "not-exist", + } + + _, execErrMsg, _ := TestFunctionsCommands(deleteFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(execErrMsg.Error(), exceptMsg)) + + notExistNameOrFqfnArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + } + _, execErrMsg, _ = TestFunctionsCommands(deleteFunctionsCmd, notExistNameOrFqfnArgs) + failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.NotNil(t, execErrMsg) + assert.True(t, strings.ContainsAny(execErrMsg.Error(), failMsg)) +} diff --git a/pkg/ctl/functions/functions.go b/pkg/ctl/functions/functions.go index e92891f5..7b85519b 100644 --- a/pkg/ctl/functions/functions.go +++ b/pkg/ctl/functions/functions.go @@ -33,6 +33,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd) cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd) return resourceCmd } diff --git a/pkg/ctl/functions/get.go b/pkg/ctl/functions/get.go new file mode 100644 index 00000000..adccd07b --- /dev/null +++ b/pkg/ctl/functions/get.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func getFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "Fetch information about a Pulsar Function" + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + get := pulsar.Example{ + Desc: "Fetch information about a Pulsar Function", + Command: "pulsarctl functions get \n" + + "\t--tenant public\n" + + "\t--namespace default \n" + + "\t--name ", + } + + getWithFqfn := pulsar.Example{ + Desc: "Fetch information about a Pulsar Function with FQFN", + Command: "pulsarctl functions get \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, get, getWithFqfn) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "{\n " + + "\"tenant\": \"public\",\n " + + "\"namespace\": \"default\",\n " + + "\"name\": \"test-functions\",\n " + + "\"className\": \"org.apache.pulsar.functions.api.examples.ExclamationFunction\",\n " + + "\"inputSpecs\": {\n \"persistent://public/default/test-topic-1\": {\n " + + "\"isRegexPattern\": false\n " + + "}\n " + + "},\n " + + "\"output\": \"persistent://public/default/test-topic-2\",\n " + + "\"processingGuarantees\": \"ATLEAST_ONCE\",\n " + + "\"retainOrdering\": false,\n \"userConfig\": {},\n " + + "\"runtime\": \"JAVA\",\n \"autoAck\": true,\n " + + "\"parallelism\": 1,\n \"resources\": {\n " + + "\"cpu\": 1.0,\n \"ram\": 1073741824,\n \"disk\": 10737418240\n },\n " + + "\"cleanupSubscription\": true\n}", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist) + desc.CommandOutput = out + + vc.SetDescription( + "get", + "Fetch information about a Pulsar Function", + desc.ToString(), + "get", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doGetFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + }) +} + +func doGetFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + functionConfig, err := admin.Functions().GetFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + cmdutils.PrintJson(vc.Command.OutOrStdout(), functionConfig) + } + + return err +} diff --git a/pkg/ctl/functions/get_test.go b/pkg/ctl/functions/get_test.go new file mode 100644 index 00000000..2a7c94b5 --- /dev/null +++ b/pkg/ctl/functions/get_test.go @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "encoding/json" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestGetFunction(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + getArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get", + } + + out, _, err := TestFunctionsCommands(getFunctionsCmd, getArgs) + assert.Nil(t, err) + + var functionConfig pulsar.FunctionConfig + err = json.Unmarshal(out.Bytes(), &functionConfig) + assert.Nil(t, err) + + assert.Equal(t, functionConfig.Tenant, "public") + assert.Equal(t, functionConfig.Namespace, "default") + assert.Equal(t, functionConfig.Name, "test-functions-get") + assert.Equal(t, functionConfig.Output, "persistent://public/default/test-output-topic") + assert.Equal(t, functionConfig.ClassName, "org.apache.pulsar.functions.api.examples.ExclamationFunction") +} + +func TestGetFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-get-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + failureDeleteArgs := []string{"get", + "--name", "not-exist", + } + + _, execErrMsg, _ := TestFunctionsCommands(getFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, execErrMsg) + exceptMsg := "Function not-exist doesn't exist" + assert.True(t, strings.Contains(execErrMsg.Error(), exceptMsg)) + + notExistNameOrFqfnArgs := []string{"get", + "--tenant", "public", + "--namespace", "default", + } + _, execErrMsg, _ = TestFunctionsCommands(getFunctionsCmd, notExistNameOrFqfnArgs) + failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.NotNil(t, execErrMsg) + assert.True(t, strings.Contains(execErrMsg.Error(), failMsg)) +} diff --git a/pkg/ctl/functions/list.go b/pkg/ctl/functions/list.go new file mode 100644 index 00000000..b0090155 --- /dev/null +++ b/pkg/ctl/functions/list.go @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/olekukonko/tablewriter" + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func listFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "List all Pulsar Functions running under a specific tenant and namespace." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + list := pulsar.Example{ + Desc: "List all Pulsar Functions running under a specific tenant and namespace", + Command: "pulsarctl functions list \n" + + "\t--tenant public\n" + + "\t--namespace default", + } + examples = append(examples, list) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "+--------------------+\n" + + "| Function Name |\n" + + "+--------------------+\n" + + "| test_function_name |\n" + + "+--------------------+", + } + + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "list", + "List all Pulsar Functions running under a specific tenant and namespace", + desc.ToString(), + "list", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doListFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + }) +} + +func doListFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + processNamespaceCmd(funcData) + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + functions, err := admin.Functions().GetFunctions(funcData.Tenant, funcData.Namespace) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + table := tablewriter.NewWriter(vc.Command.OutOrStdout()) + table.SetHeader([]string{"Pulsar Function Name"}) + + for _, f := range functions { + table.Append([]string{f}) + } + + table.Render() + } + return err +} diff --git a/pkg/ctl/functions/list_test.go b/pkg/ctl/functions/list_test.go new file mode 100644 index 00000000..b04fa60f --- /dev/null +++ b/pkg/ctl/functions/list_test.go @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestListFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-list", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + listArgs := []string{"list"} + functions, _, err := TestFunctionsCommands(listFunctionsCmd, listArgs) + assert.Nil(t, err) + assert.True(t, strings.Contains(functions.String(), "test-functions-list")) + + deleteArgs := []string{"delete", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-list", + } + + _, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs) + assert.Nil(t, nil) + + listArgsAgain := []string{"list", + "--tenant", "public", + "--namespace", "default", + } + out, _, err := TestFunctionsCommands(listFunctionsCmd, listArgsAgain) + assert.Nil(t, err) + assert.False(t, strings.Contains(out.String(), "test-functions-list")) +} diff --git a/pkg/ctl/functions/restart.go b/pkg/ctl/functions/restart.go new file mode 100644 index 00000000..6da769bd --- /dev/null +++ b/pkg/ctl/functions/restart.go @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func restartFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for restarting function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + restart := pulsar.Example{ + Desc: "Restart function instance", + Command: "pulsarctl functions restart \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, restart) + + restartWithInstanceID := pulsar.Example{ + Desc: "Restart function instance with instance ID", + Command: "pulsarctl functions restart \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, restartWithInstanceID) + + restartWithFQFN := pulsar.Example{ + Desc: "Restart function instance with FQFN", + Command: "pulsarctl functions restart \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, restartWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Restarted successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "restart", + "Restart function instance", + desc.ToString(), + "restart", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doRestartFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (restart all instances if instance-id is not provided)") + }) +} + +func doRestartFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().RestartFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Restarted %s successfully", funcData.FuncName) + } else { + err = admin.Functions().RestartFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Restarted %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/restart_test.go b/pkg/ctl/functions/restart_test.go new file mode 100644 index 00000000..25f99fdc --- /dev/null +++ b/pkg/ctl/functions/restart_test.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestRestartFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + restartArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart", + } + + _, _, err = TestFunctionsCommands(restartFunctionsCmd, restartArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + restartArgsFqfn := []string{"restart", + "--fqfn", "public/default/test-functions-restart-fqfn", + } + + _, _, err = TestFunctionsCommands(restartFunctionsCmd, restartArgsFqfn) + assert.Nil(t, err) +} + +func TestRestartFunctionsFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + // test the function name not exist + failureDeleteArgs := []string{"restart", + "--name", "not-exist", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + + // test the --name args not exist + notExistNameOrFqfnArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + + // test the instance id not exist + notExistInstanceIDArgs := []string{"restart", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-restart-failure", + "--instance-id", "12345678", + } + _, err, _ = TestFunctionsCommands(restartFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) +} diff --git a/pkg/ctl/functions/start.go b/pkg/ctl/functions/start.go new file mode 100644 index 00000000..ca865305 --- /dev/null +++ b/pkg/ctl/functions/start.go @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "strconv" +) + +func startFunctionsCmd(vc *cmdutils.VerbCmd) { + desc := pulsar.LongDescription{} + desc.CommandUsedFor = "This command is used for starting a stopped function instance." + desc.CommandPermission = "This command requires super-user permissions." + + var examples []pulsar.Example + + start := pulsar.Example{ + Desc: "Starts a stopped function instance", + Command: "pulsarctl functions start \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name ", + } + examples = append(examples, start) + + startWithInstanceID := pulsar.Example{ + Desc: "Starts a stopped function instance with instance ID", + Command: "pulsarctl functions start \n" + + "\t--tenant public\n" + + "\t--namespace default\n" + + "\t--name \n" + + "\t--instance-id 1", + } + examples = append(examples, startWithInstanceID) + + startWithFQFN := pulsar.Example{ + Desc: "Starts a stopped function instance with FQFN", + Command: "pulsarctl functions start \n" + + "\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]", + } + examples = append(examples, startWithFQFN) + desc.CommandExamples = examples + + var out []pulsar.Output + successOut := pulsar.Output{ + Desc: "normal output", + Out: "Started successfully", + } + + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) + desc.CommandOutput = out + + vc.SetDescription( + "start", + "Starts a stopped function instance", + desc.ToString(), + "start", + ) + + functionData := &pulsar.FunctionData{} + + // set the run function + vc.SetRunFunc(func() error { + return doStartFunctions(vc, functionData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &functionData.FQFN, + "fqfn", + "", + "The Fully Qualified Function Name (FQFN) for the function") + + flagSet.StringVar( + &functionData.Tenant, + "tenant", + "", + "The tenant of a Pulsar Function") + + flagSet.StringVar( + &functionData.Namespace, + "namespace", + "", + "The namespace of a Pulsar Function") + + flagSet.StringVar( + &functionData.FuncName, + "name", + "", + "The name of a Pulsar Function") + + flagSet.StringVar( + &functionData.InstanceID, + "instance-id", + "", + "The function instanceId (start all instances if instance-id is not provided)") + }) +} + +func doStartFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error { + err := processBaseArguments(funcData) + if err != nil { + vc.Command.Help() + return err + } + + admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3) + if funcData.InstanceID != "" { + instanceID, err := strconv.Atoi(funcData.InstanceID) + if err != nil { + return err + } + err = admin.Functions().StartFunctionWithID(funcData.Tenant, funcData.Namespace, funcData.FuncName, instanceID) + if err != nil { + return err + } + vc.Command.Printf("Started %s successfully", funcData.FuncName) + } else { + err = admin.Functions().StartFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName) + if err != nil { + return err + } + + vc.Command.Printf("Started %s successfully", funcData.FuncName) + } + + return nil +} diff --git a/pkg/ctl/functions/start_test.go b/pkg/ctl/functions/start_test.go new file mode 100644 index 00000000..38100a48 --- /dev/null +++ b/pkg/ctl/functions/start_test.go @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package functions + +import ( + "github.com/stretchr/testify/assert" + "os" + "strings" + "testing" +) + +func TestStartFunctions(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + stopArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + assert.Nil(t, err) + + startArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start", + } + + _, _, err = TestFunctionsCommands(startFunctionsCmd, startArgs) + assert.Nil(t, err) + + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-start-fqfn", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) + + startArgsFqfn := []string{"start", + "--fqfn", "public/default/test-functions-start-fqfn", + } + + _, _, err = TestFunctionsCommands(startFunctionsCmd, startArgsFqfn) + assert.Nil(t, err) + + // test failure cases + + stopArgsFqfnAgain := []string{"stop", + "--fqfn", "public/default/test-functions-start-fqfn", + } + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfnAgain) + assert.Nil(t, err) + + // test the function name not exist + failureStartArgs := []string{"start", + "--name", "not-exist", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, failureStartArgs) + assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + + // test the --name args not exist + notExistNameOrFqfnArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + + // test the instance id not exist + notExistInstanceIDArgs := []string{"start", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-start-fqfn", + "--instance-id", "12345678", + } + _, err, _ = TestFunctionsCommands(startFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) +} diff --git a/pkg/ctl/functions/stop.go b/pkg/ctl/functions/stop.go index 1844e67c..e1184f4c 100644 --- a/pkg/ctl/functions/stop.go +++ b/pkg/ctl/functions/stop.go @@ -64,12 +64,27 @@ func stopFunctionsCmd(vc *cmdutils.VerbCmd) { Out: "Stopped successfully", } - out = append(out, successOut) + failOut := pulsar.Output{ + Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args", + Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)", + } + + failOutWithNameNotExist := pulsar.Output{ + Desc: "The name of Pulsar Functions doesn't exist, please check the --name args", + Out: "[✖] code: 404 reason: Function doesn't exist", + } + + failOutWithWrongInstanceID := pulsar.Output{ + Desc: "Used an instanceID that does not exist or other impermissible actions", + Out: "[✖] code: 400 reason: Operation not permitted", + } + + out = append(out, successOut, failOut, failOutWithNameNotExist, failOutWithWrongInstanceID) desc.CommandOutput = out vc.SetDescription( "stop", - "", + "Stops function instance", desc.ToString(), "stop", ) diff --git a/pkg/ctl/functions/stop_test.go b/pkg/ctl/functions/stop_test.go index f574cc42..908160fd 100644 --- a/pkg/ctl/functions/stop_test.go +++ b/pkg/ctl/functions/stop_test.go @@ -20,6 +20,7 @@ package functions import ( "github.com/stretchr/testify/assert" "os" + "strings" "testing" ) @@ -39,7 +40,7 @@ func TestStopFunctions(t *testing.T) { "--jar", jarName, } - _, err = TestFunctionsCommands(createFunctionsCmd, args) + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) assert.Nil(t, err) stopArgs := []string{"stop", @@ -48,26 +49,77 @@ func TestStopFunctions(t *testing.T) { "--name", "test-functions-stop", } - _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgs) assert.Nil(t, err) - argsFqfn := []string{"create", - "--tenant", "public", - "--namespace", "default", - "--name", "test-functions-stop-fqfn", - "--inputs", "test-input-topic", - "--output", "persistent://public/default/test-output-topic", - "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", - "--jar", jarName, - } - - _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) - assert.Nil(t, err) - - stopArgsFqfn := []string{"stop", - "--fqfn", "public/default/test-functions-stop-fqfn", - } - - _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) - assert.Nil(t, err) + argsFqfn := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-fqfn", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn) + assert.Nil(t, err) + + stopArgsFqfn := []string{"stop", + "--fqfn", "public/default/test-functions-stop-fqfn", + } + + _, _, err = TestFunctionsCommands(stopFunctionsCmd, stopArgsFqfn) + assert.Nil(t, err) +} + +func TestStopFunctionsWithFailure(t *testing.T) { + jarName := "dummyExample.jar" + _, err := os.Create(jarName) + assert.Nil(t, err) + + defer os.Remove(jarName) + args := []string{"create", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-failure", + "--inputs", "test-input-topic", + "--output", "persistent://public/default/test-output-topic", + "--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction", + "--jar", jarName, + } + + _, _, err = TestFunctionsCommands(createFunctionsCmd, args) + assert.Nil(t, err) + + // test the function name not exist + failureDeleteArgs := []string{"stop", + "--name", "not-exist", + } + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, failureDeleteArgs) + assert.NotNil(t, err) + failMsg := "Function not-exist doesn't exist" + assert.True(t, strings.ContainsAny(err.Error(), failMsg)) + + // test the --name args not exist + notExistNameOrFqfnArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + } + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, notExistNameOrFqfnArgs) + assert.NotNil(t, err) + failNameMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)" + assert.True(t, strings.ContainsAny(err.Error(), failNameMsg)) + + // test the instance id not exist + notExistInstanceIDArgs := []string{"stop", + "--tenant", "public", + "--namespace", "default", + "--name", "test-functions-stop-failure", + "--instance-id", "12345678", + } + _, err, _ = TestFunctionsCommands(stopFunctionsCmd, notExistInstanceIDArgs) + assert.NotNil(t, err) + failInstanceIDMsg := "Operation not permitted" + assert.True(t, strings.ContainsAny(err.Error(), failInstanceIDMsg)) } diff --git a/pkg/ctl/functions/test_help.go b/pkg/ctl/functions/test_help.go index bafc2aba..b1ba4318 100644 --- a/pkg/ctl/functions/test_help.go +++ b/pkg/ctl/functions/test_help.go @@ -25,7 +25,7 @@ import ( "os" ) -func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) { +func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, err error) { var rootCmd = &cobra.Command{ Use: "pulsarctl [command]", Short: "a CLI for Apache Pulsar", @@ -36,6 +36,11 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( }, } + var execError error + cmdutils.ExecErrorHandler = func(err error) { + execError = err + } + buf := new(bytes.Buffer) rootCmd.SetOut(buf) rootCmd.SetArgs(append([]string{"functions"}, args...)) @@ -50,7 +55,7 @@ func TestFunctionsCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) ( rootCmd.AddCommand(resourceCmd) err = rootCmd.Execute() - return buf, err + return buf, execError, err } var ( diff --git a/pkg/ctl/functions/util.go b/pkg/ctl/functions/util.go index 30756e4b..20f4dfb4 100644 --- a/pkg/ctl/functions/util.go +++ b/pkg/ctl/functions/util.go @@ -196,20 +196,6 @@ func processArgs(funcData *pulsar.FunctionData) error { funcData.FuncConf.OutputSchemaType = funcData.SchemaType } - // processingGuarantees default value is 0, means AtLeastOnce. - if funcData.ProcessingGuarantees != "" { - switch funcData.ProcessingGuarantees { - case "ATMOST_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.AtMostOnce - case "EFFECTIVELY_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.EffectivelyOnce - case "ATLEAST_ONCE": - funcData.FuncConf.ProcessingGuarantees = pulsar.AtLeasetOnce - default: - funcData.FuncConf.ProcessingGuarantees = pulsar.AtLeasetOnce - } - } - if funcData.RetainOrdering { funcData.FuncConf.RetainOrdering = funcData.RetainOrdering } @@ -336,13 +322,6 @@ func processArgs(funcData *pulsar.FunctionData) error { } func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { - // go doesn't need className - if functionConfig.Runtime == pulsar.Python || functionConfig.Runtime == pulsar.Java { - if functionConfig.ClassName == "" { - return errors.New("no Function Classname specified") - } - } - if functionConfig.Name == "" { inferMissingFunctionName(functionConfig) } @@ -380,6 +359,21 @@ func validateFunctionConfigs(functionConfig *pulsar.FunctionConfig) error { return errors.New("the specified go file does not exist") } + if functionConfig.Jar != "" { + functionConfig.Runtime = pulsar.JavaRuntime + } else if functionConfig.Py != "" { + functionConfig.Runtime = pulsar.PythonRuntime + } else if functionConfig.Go != "" { + functionConfig.Runtime = pulsar.GoRuntime + } + + // go doesn't need className + if functionConfig.Runtime == pulsar.JavaRuntime || functionConfig.Runtime == pulsar.PythonRuntime { + if functionConfig.ClassName == "" { + return errors.New("no Function Classname specified") + } + } + return nil } @@ -416,3 +410,10 @@ func processBaseArguments(funcData *pulsar.FunctionData) error { return nil } + +func processNamespaceCmd(funcData *pulsar.FunctionData) { + if funcData.Tenant == "" || funcData.Namespace == "" { + funcData.Tenant = PublicTenant + funcData.Namespace = DefaultNamespace + } +} diff --git a/pkg/pulsar/functionConfg.go b/pkg/pulsar/functionConfg.go index 01c0ffdf..13eb356e 100644 --- a/pkg/pulsar/functionConfg.go +++ b/pkg/pulsar/functionConfg.go @@ -17,20 +17,10 @@ package pulsar -type ProcessingGuarantees int - -type Runtime int - -const ( - AtLeasetOnce ProcessingGuarantees = iota - AtMostOnce - EffectivelyOnce -) - const ( - Java Runtime = iota - Python - Go + JavaRuntime = "JAVA" + PythonRuntime = "PYTHON" + GoRuntime = "GO" ) type FunctionConfig struct { @@ -58,7 +48,7 @@ type FunctionConfig struct { OutputSerdeClassName string `json:"outputSerdeClassName" yaml:"outputSerdeClassName"` LogTopic string `json:"logTopic" yaml:"logTopic"` - ProcessingGuarantees ProcessingGuarantees `json:"processingGuarantees" yaml:"processingGuarantees"` + ProcessingGuarantees string `json:"processingGuarantees" yaml:"processingGuarantees"` RetainOrdering bool `json:"retainOrdering" yaml:"retainOrdering"` UserConfig map[string]interface{} `json:"userConfig" yaml:"userConfig"` @@ -69,7 +59,7 @@ type FunctionConfig struct { // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} `json:"secrets" yaml:"secrets"` - Runtime Runtime `json:"runtime" yaml:"runtime"` + Runtime string `json:"runtime" yaml:"runtime"` AutoAck bool `json:"autoAck" yaml:"autoAck"` MaxMessageRetries int `json:"maxMessageRetries" yaml:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"` @@ -78,7 +68,7 @@ type FunctionConfig struct { Resources *Resources `json:"resources" yaml:"resources"` FQFN string `json:"fqfn" yaml:"fqfn"` WindowConfig *WindowConfig `json:"windowConfig" yaml:"windowConfig"` - TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` + TimeoutMs *int64 `json:"timeoutMs" yaml:"timeoutMs"` Jar string `json:"jar" yaml:"jar"` Py string `json:"py" yaml:"py"` Go string `json:"go" yaml:"go"` diff --git a/pkg/pulsar/functions.go b/pkg/pulsar/functions.go index d5352039..089c8c50 100644 --- a/pkg/pulsar/functions.go +++ b/pkg/pulsar/functions.go @@ -49,6 +49,27 @@ type Functions interface { // Stop function instance StopFunctionWithID(tenant, namespace, name string, instanceID int) error + + // Delete an existing function + DeleteFunction(tenant, namespace, name string) error + + // Start all function instances + StartFunction(tenant, namespace, name string) error + + // Start function instance + StartFunctionWithID(tenant, namespace, name string, instanceID int) error + + // Restart all function instances + RestartFunction(tenant, namespace, name string) error + + // Restart function instance + RestartFunctionWithID(tenant, namespace, name string, instanceID int) error + + // Get the list of functions + GetFunctions(tenant, namespace string) ([]string, error) + + // Get the configuration for the specified function + GetFunction(tenant, namespace, name string) (FunctionConfig, error) } type functions struct { @@ -183,21 +204,55 @@ func (f *functions) CreateFuncWithUrl(funcConf *FunctionConfig, pkgUrl string) e func (f *functions) StopFunction(tenant, namespace, name string) error { endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) - err := f.client.post(endpoint+"/stop", "", nil) - if err != nil { - return err - } - return nil + return f.client.post(endpoint+"/stop", "", nil) } func (f *functions) StopFunctionWithID(tenant, namespace, name string, instanceID int) error { id := fmt.Sprintf("%d", instanceID) endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) - err := f.client.post(endpoint+"/stop", "", nil) - if err != nil { - return err - } + return f.client.post(endpoint+"/stop", "", nil) +} + +func (f *functions) DeleteFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.delete(endpoint, nil) +} + +func (f *functions) StartFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.post(endpoint+"/start", "", nil) +} + +func (f *functions) StartFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + return f.client.post(endpoint+"/start", "", nil) +} + +func (f *functions) RestartFunction(tenant, namespace, name string) error { + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + return f.client.post(endpoint+"/restart", "", nil) +} + +func (f *functions) RestartFunctionWithID(tenant, namespace, name string, instanceID int) error { + id := fmt.Sprintf("%d", instanceID) + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name, id) + + return f.client.post(endpoint+"/restart", "", nil) +} + +func (f *functions) GetFunctions(tenant, namespace string) ([]string, error) { + var functions []string + endpoint := f.client.endpoint(f.basePath, tenant, namespace) + err := f.client.get(endpoint, &functions) + return functions, err +} - return nil +func (f *functions) GetFunction(tenant, namespace, name string) (FunctionConfig, error) { + var functionConfig FunctionConfig + endpoint := f.client.endpoint(f.basePath, tenant, namespace, name) + err := f.client.get(endpoint, &functionConfig) + return functionConfig, err } diff --git a/pkg/pulsar/sinkConfig.go b/pkg/pulsar/sinkConfig.go index 75dc3cd0..db28bd56 100644 --- a/pkg/pulsar/sinkConfig.go +++ b/pkg/pulsar/sinkConfig.go @@ -38,7 +38,7 @@ type SinkConfig struct { // SecretProviderConfigurator.getSecretObjectType() method. Secrets map[string]interface{} Parallelism int - ProcessingGuarantees ProcessingGuarantees + ProcessingGuarantees string RetainOrdering bool Resources Resources AutoAck bool diff --git a/pkg/pulsar/sourceConfig.go b/pkg/pulsar/sourceConfig.go index b0e80dc1..1a1b263a 100644 --- a/pkg/pulsar/sourceConfig.go +++ b/pkg/pulsar/sourceConfig.go @@ -37,7 +37,7 @@ type SourceConfig struct { Secrets map[string]interface{} Parallelism int - ProcessingGuarantees ProcessingGuarantees + ProcessingGuarantees string Resources Resources Archive string // Any flags that you want to pass to the runtime.