Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions pkg/ctl/cluster/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"os"
)

func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) {

func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, execErr, nameErr, err error) {
var execError error
cmdutils.ExecErrorHandler = func(err error) {
execError = err
Expand All @@ -20,9 +19,9 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou
nameError = err
}

var rootCmd = &cobra.Command {
Use: "pulsarctl [command]",
Short: "a CLI for Apache Pulsar",
var rootCmd = &cobra.Command{
Use: "pulsarctl [command]",
Short: "a CLI for Apache Pulsar",
Run: func(cmd *cobra.Command, _ []string) {
if err := cmd.Help(); err != nil {
logger.Debug("ignoring error %q", err.Error())
Expand All @@ -34,7 +33,6 @@ func TestClusterCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string) (ou
rootCmd.SetOut(buf)
rootCmd.SetArgs(append([]string{"clusters"}, args...))


resourceCmd := cmdutils.NewResourceCmd(
"clusters",
"Operations about cluster(s)",
Expand All @@ -53,10 +51,10 @@ var (
basePath string
)

func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes.Buffer, err error) {
var rootCmd = &cobra.Command {
Use: "pulsarctl [command]",
Short: "a CLI for Apache Pulsar",
func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string) (out *bytes.Buffer, err error) {
var rootCmd = &cobra.Command{
Use: "pulsarctl [command]",
Short: "a CLI for Apache Pulsar",
Run: func(cmd *cobra.Command, _ []string) {
if err := cmd.Help(); err != nil {
logger.Debug("ignoring error %q", err.Error())
Expand Down Expand Up @@ -84,9 +82,9 @@ func TestTlsHelp(newVerb func(cmd *cmdutils.VerbCmd), args []string)(out *bytes.

baseArgs := []string{
"--auth-params",
"{\"tlsCertFile\":\""+basePath+"/test/auth/certs/client-cert.pem\""+
",\"tlsKeyFile\":\""+basePath+"/test/auth/certs/client-key.pem\"}",
"--tls-trust-cert-pat", basePath+"/test/auth/certs/cacert.pem",
"{\"tlsCertFile\":\"" + basePath + "/test/auth/certs/client-cert.pem\"" +
",\"tlsKeyFile\":\"" + basePath + "/test/auth/certs/client-key.pem\"}",
"--tls-trust-cert-pat", basePath + "/test/auth/certs/cacert.pem",
"--admin-service-url", "https://localhost:8443",
"--tls-allow-insecure"}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ctl/functions/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCreateFunctions(t *testing.T) {
"--processing-guarantees", "EFFECTIVELY_ONCE",
}

_, err = TestFunctionsCommands(createFunctionsCmd, args)
_, _, err = TestFunctionsCommands(createFunctionsCmd, args)
assert.Nil(t, err)

// $ bin/pulsar-admin functions create
Expand All @@ -66,7 +66,7 @@ func TestCreateFunctions(t *testing.T) {
"--jar", jarName,
}

_, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf)
_, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithConf)
assert.Nil(t, err)

argsWithFileUrl := []string{"create",
Expand All @@ -80,6 +80,6 @@ func TestCreateFunctions(t *testing.T) {
"--processing-guarantees", "EFFECTIVELY_ONCE",
}

_, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl)
_, _, err = TestFunctionsCommands(createFunctionsCmd, argsWithFileUrl)
assert.Nil(t, err)
}
135 changes: 135 additions & 0 deletions pkg/ctl/functions/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar"
)

func deleteFunctionsCmd(vc *cmdutils.VerbCmd) {
desc := pulsar.LongDescription{}
desc.CommandUsedFor = "This command is used for delete a Pulsar Function that is running on a Pulsar cluster."
desc.CommandPermission = "This command requires super-user permissions."

var examples []pulsar.Example

del := pulsar.Example{
Desc: "Delete a Pulsar Function that is running on a Pulsar cluster",
Command: "pulsarctl functions delete \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Functions>",
}
examples = append(examples, del)

delWithInstanceID := pulsar.Example{
Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with instance ID",
Command: "pulsarctl functions delete \n" +
"\t--tenant public\n" +
"\t--namespace default\n" +
"\t--name <the name of Pulsar Functions> \n" +
"\t--instance-id 1",
}
examples = append(examples, delWithInstanceID)

delWithFqfn := pulsar.Example{
Desc: "Delete a Pulsar Function that is running on a Pulsar cluster with FQFN",
Command: "pulsarctl functions delete \n" +
"\t--fqfn tenant/namespace/name [eg: public/default/ExampleFunctions]",
}
examples = append(examples, delWithFqfn)
desc.CommandExamples = examples

var out []pulsar.Output
successOut := pulsar.Output{
Desc: "normal output",
Out: "Deleted successfully",
}

failOut := pulsar.Output{
Desc: "You must specify a name for the Pulsar Functions or a FQFN, please check the --name args",
Out: "[✖] you must specify a name for the function or a Fully Qualified Function Name (FQFN)",
}

failOutWithNameNotExist := pulsar.Output{
Desc: "The name of Pulsar Functions doesn't exist, please check the --name args",
Out: "[✖] code: 404 reason: Function <your function name> doesn't exist",
}

out = append(out, successOut, failOut, failOutWithNameNotExist)
desc.CommandOutput = out

vc.SetDescription(
"delete",
"Delete a Pulsar Function that is running on a Pulsar cluster",
desc.ToString(),
"delete",
)

functionData := &pulsar.FunctionData{}

// set the run function
vc.SetRunFunc(func() error {
return doDeleteFunctions(vc, functionData)
})

// register the params
vc.FlagSetGroup.InFlagSet("FunctionsConfig", func(flagSet *pflag.FlagSet) {
flagSet.StringVar(
&functionData.FQFN,
"fqfn",
"",
"The Fully Qualified Function Name (FQFN) for the function")

flagSet.StringVar(
&functionData.Tenant,
"tenant",
"",
"The tenant of a Pulsar Function")

flagSet.StringVar(
&functionData.Namespace,
"namespace",
"",
"The namespace of a Pulsar Function")

flagSet.StringVar(
&functionData.FuncName,
"name",
"",
"The name of a Pulsar Function")
})
}

func doDeleteFunctions(vc *cmdutils.VerbCmd, funcData *pulsar.FunctionData) error {
err := processBaseArguments(funcData)
if err != nil {
vc.Command.Help()
return err
}
admin := cmdutils.NewPulsarClientWithApiVersion(pulsar.V3)
err = admin.Functions().DeleteFunction(funcData.Tenant, funcData.Namespace, funcData.FuncName)
if err != nil {
return err
}

vc.Command.Printf("Deleted successfully")
return nil
}
113 changes: 113 additions & 0 deletions pkg/ctl/functions/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package functions

import (
"github.com/stretchr/testify/assert"
"os"
"strings"
"testing"
)

func TestDeleteFunctions(t *testing.T) {
jarName := "dummyExample.jar"
_, err := os.Create(jarName)
assert.Nil(t, err)

defer os.Remove(jarName)
args := []string{"create",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-delete",
"--inputs", "test-input-topic",
"--output", "persistent://public/default/test-output-topic",
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar", jarName,
}

_, _, err = TestFunctionsCommands(createFunctionsCmd, args)
assert.Nil(t, err)

deleteArgs := []string{"delete",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-delete",
}

_, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgs)
assert.Nil(t, err)

argsFqfn := []string{"create",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-delete-fqfn",
"--inputs", "test-input-topic",
"--output", "persistent://public/default/test-output-topic",
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar", jarName,
}

_, _, err = TestFunctionsCommands(createFunctionsCmd, argsFqfn)
assert.Nil(t, err)

deleteArgsFqfn := []string{"delete",
"--fqfn", "public/default/test-functions-delete-fqfn",
}

_, _, err = TestFunctionsCommands(deleteFunctionsCmd, deleteArgsFqfn)
assert.Nil(t, err)
}

func TestDeleteFunctionsWithFailure(t *testing.T) {
jarName := "dummyExample.jar"
_, err := os.Create(jarName)
assert.Nil(t, err)

defer os.Remove(jarName)
args := []string{"create",
"--tenant", "public",
"--namespace", "default",
"--name", "test-functions-delete-failure",
"--inputs", "test-input-topic",
"--output", "persistent://public/default/test-output-topic",
"--classname", "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"--jar", jarName,
}

_, _, err = TestFunctionsCommands(createFunctionsCmd, args)

assert.Nil(t, err)

failureDeleteArgs := []string{"delete",
"--name", "not-exist",
}

_, execErrMsg, _ := TestFunctionsCommands(deleteFunctionsCmd, failureDeleteArgs)
assert.NotNil(t, execErrMsg)
exceptMsg := "Function not-exist doesn't exist"
assert.True(t, strings.ContainsAny(execErrMsg.Error(), exceptMsg))

notExistNameOrFqfnArgs := []string{"delete",
"--tenant", "public",
"--namespace", "default",
}
_, execErrMsg, _ = TestFunctionsCommands(deleteFunctionsCmd, notExistNameOrFqfnArgs)
failMsg := "you must specify a name for the function or a Fully Qualified Function Name (FQFN)"
assert.NotNil(t, execErrMsg)
assert.True(t, strings.ContainsAny(execErrMsg.Error(), failMsg))
}
5 changes: 5 additions & 0 deletions pkg/ctl/functions/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {

cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, stopFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, startFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, restartFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listFunctionsCmd)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getFunctionsCmd)

return resourceCmd
}
Loading