From ffdbad2516fc6564de2b6252497b9db300500dbc Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 10 Jul 2019 18:57:12 -0700 Subject: [PATCH 1/3] Initialize the pulsarctl project *Modifications* - setup the project layout - add two clusters command --- .gitignore | 4 + README.md | 9 ++ go.mod | 14 ++ go.sum | 47 +++++++ main.go | 69 ++++++++++ pkg/cmdutils/cmdutils.go | 64 +++++++++ pkg/cmdutils/config.go | 38 +++++ pkg/cmdutils/group.go | 106 ++++++++++++++ pkg/cmdutils/verb.go | 55 ++++++++ pkg/ctl/cluster/cluster.go | 19 +++ pkg/ctl/cluster/create.go | 71 ++++++++++ pkg/ctl/cluster/list.go | 37 +++++ pkg/pulsar/admin.go | 276 +++++++++++++++++++++++++++++++++++++ pkg/pulsar/cluster.go | 48 +++++++ pkg/pulsar/data.go | 11 ++ pkg/pulsar/errors.go | 19 +++ pkg/pulsar/utils.go | 7 + 17 files changed, 894 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 pkg/cmdutils/cmdutils.go create mode 100644 pkg/cmdutils/config.go create mode 100644 pkg/cmdutils/group.go create mode 100644 pkg/cmdutils/verb.go create mode 100644 pkg/ctl/cluster/cluster.go create mode 100644 pkg/ctl/cluster/create.go create mode 100644 pkg/ctl/cluster/list.go create mode 100644 pkg/pulsar/admin.go create mode 100644 pkg/pulsar/cluster.go create mode 100644 pkg/pulsar/data.go create mode 100644 pkg/pulsar/errors.go create mode 100644 pkg/pulsar/utils.go diff --git a/.gitignore b/.gitignore index f1c181ec..57d30eb0 100644 --- a/.gitignore +++ b/.gitignore @@ -4,9 +4,13 @@ *.dll *.so *.dylib +pulsarctl # Test binary, build with `go test -c` *.test # Output of the go coverage tool, specifically when used with LiteIDE *.out + +# Intellij +.idea diff --git a/README.md b/README.md index fb1ab8ad..85c7ceea 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,11 @@ # pulsarctl + a CLI for Apache Pulsar + +## Build + +``` +export GO111MODULE=on + +go build -o pulsarctl +``` diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..2bd7af3d --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/streamnative/pulsarctl + +go 1.12 + +require ( + github.com/fatih/color v1.7.0 // indirect + github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 + github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect + github.com/mattn/go-colorable v0.1.2 // indirect + github.com/mattn/go-runewidth v0.0.4 // indirect + github.com/olekukonko/tablewriter v0.0.1 + github.com/spf13/cobra v0.0.5 + github.com/spf13/pflag v1.0.3 +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..a025d289 --- /dev/null +++ b/go.sum @@ -0,0 +1,47 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 h1:vN4d3jSss3ExzUn2cE0WctxztfOgiKvMKnDrydBsg00= +github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06/go.mod h1:++9BgZujZd4v0ZTZCb5iPsaomXdZWyxotIAh1IiDm44= +github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBEhQjrV+KJ9lEWDWYZoNVZUaBF++Wyljq4= +github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= +github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8urCTFX88= +github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 00000000..64ff3925 --- /dev/null +++ b/main.go @@ -0,0 +1,69 @@ +package main + +import ( + "fmt" + "github.com/kris-nova/logger" + "github.com/spf13/cobra" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/ctl/cluster" + "os" +) + +var rootCmd = &cobra.Command { + Use: "pulsarctl [command]", + Short: "a CLI for Apache Pulsar", + Run: func(cmd *cobra.Command, _ []string) { + if err := cmd.Help(); err != nil { + logger.Debug("ignoring error %q", err.Error()) + } + }, +} + +func init() { + + var colorValue string + + flagGrouping := cmdutils.NewGrouping() + + addCommands(flagGrouping) + + rootCmd.PersistentFlags().BoolP("help", "h", false, "help for this command") + rootCmd.PersistentFlags().StringVarP(&colorValue, "color", "C", "true", "toggle colorized logs (true,false,fabulous)") + rootCmd.PersistentFlags().IntVarP(&logger.Level, "verbose", "v", 3, "set log level, use 0 to silence, 4 for debugging") + // add the common pulsarctl flags + rootCmd.PersistentFlags().AddFlagSet(cmdutils.PulsarCtlConfig.FlagSet()) + + cobra.OnInitialize(func() { + // Control colored output + color := true + fabulous := false + switch colorValue { + case "false": + color = false + case "fabulous": + color = false + fabulous = true + } + logger.Color = color + logger.Fabulous = fabulous + + // Add timestamps for debugging + logger.Timestamps = false + if logger.Level >= 4 { + logger.Timestamps = true + } + }) + + rootCmd.SetUsageFunc(flagGrouping.Usage) +} + +func addCommands(flagGrouping *cmdutils.FlagGrouping) { + rootCmd.AddCommand(cluster.Command(flagGrouping)) +} + +func main() { + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) // outputs cobra errors + os.Exit(-1) + } +} diff --git a/pkg/cmdutils/cmdutils.go b/pkg/cmdutils/cmdutils.go new file mode 100644 index 00000000..572449ca --- /dev/null +++ b/pkg/cmdutils/cmdutils.go @@ -0,0 +1,64 @@ +package cmdutils + +import ( + "encoding/json" + "fmt" + "github.com/kris-nova/logger" + "github.com/spf13/cobra" + "github.com/streamnative/pulsarctl/pkg/pulsar" + "io" + "os" + "strings" +) + +const IncompatibleFlags = "cannot be used at the same time" + +// NewVerbCmd defines a standard resource command +func NewResourceCmd(use, short, long string, aliases ...string) *cobra.Command { + return &cobra.Command{ + Use: use, + Short: short, + Long: long, + Aliases: aliases, + Run: func(cmd *cobra.Command, _ []string) { + if err := cmd.Help(); err != nil { + logger.Debug("ignoring error %q", err.Error()) + } + }, + } +} + +// GetNameArg tests to ensure there is only 1 name argument +func GetNameArg(args []string) string { + if len(args) > 1 { + logger.Critical("only one argument is allowed to be used as a name") + os.Exit(1) + } + if len(args) == 1 { + return strings.TrimSpace(args[0]) + } + return "" +} + +func NewPulsarClient() pulsar.Client { + return PulsarCtlConfig.Client() +} + +func PrintJson(w io.Writer, obj interface{}) { + b, err := json.MarshalIndent(obj, "", " ") + if err != nil { + fmt.Fprintf(w, "unexpected response type: %v\n", err) + return + } + fmt.Fprintln(w, string(b)) +} + + +func PrintError(w io.Writer, err error) { + msg := err.Error() + if pulsar.IsAdminError(err) { + ae, _ := err.(pulsar.Error) + msg = ae.Reason + } + fmt.Fprintln(w, "error:", msg) +} \ No newline at end of file diff --git a/pkg/cmdutils/config.go b/pkg/cmdutils/config.go new file mode 100644 index 00000000..3a2fe2c8 --- /dev/null +++ b/pkg/cmdutils/config.go @@ -0,0 +1,38 @@ +package cmdutils + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +var PulsarCtlConfig = ClusterConfig{} + +// the configuration of the cluster that pulsarctl connects to +type ClusterConfig struct { + // the web service url that pulsarctl connects to. Default is http://localhost:8080 + WebServiceUrl string +} + +func (c *ClusterConfig) FlagSet() *pflag.FlagSet { + flags := pflag.NewFlagSet( + "PulsarCtl Config", + pflag.ContinueOnError) + flags.StringVarP( + &c.WebServiceUrl, + "admin-service-url", + "s", + pulsar.DefaultWebServiceURL, + "The admin web service url that pulsarctl connects to.") + return flags +} + +func (c *ClusterConfig) Client() pulsar.Client { + config := pulsar.DefaultConfig() + + if len(c.WebServiceUrl) > 0 && c.WebServiceUrl != config.WebServiceUrl { + config.WebServiceUrl = c.WebServiceUrl + } + + return pulsar.New(config) +} + diff --git a/pkg/cmdutils/group.go b/pkg/cmdutils/group.go new file mode 100644 index 00000000..0882ba39 --- /dev/null +++ b/pkg/cmdutils/group.go @@ -0,0 +1,106 @@ +package cmdutils + +import ( + "fmt" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "strings" + "unicode" +) + +// FlagGrouping holds a superset of all flagsets for all commands +type FlagGrouping struct { + groups map[*cobra.Command]*NamedFlagSetGroup +} + +type namedFlagSet struct { + name string + fs *pflag.FlagSet +} + +// NamedFlagSetGroup holds a single group of flagsets +type NamedFlagSetGroup struct { + list []namedFlagSet +} + +// NewGrouping creates an instance of Grouping +func NewGrouping() *FlagGrouping { + return &FlagGrouping{ + make(map[*cobra.Command]*NamedFlagSetGroup), + } +} + +// New creates a new group of flagsets for use with a subcommand +func (g *FlagGrouping) New(cmd *cobra.Command) *NamedFlagSetGroup { + n := &NamedFlagSetGroup{} + g.groups[cmd] = n + return n +} + +// InFlagSet returns new or existing named FlagSet in a group +func (n *NamedFlagSetGroup) InFlagSet(name string, cb func(*pflag.FlagSet)) { + for _, nfs := range n.list { + if nfs.name == name { + cb(nfs.fs) + return + } + } + + nfs := namedFlagSet{ + name: name, + fs: &pflag.FlagSet{}, + } + cb(nfs.fs) + n.list = append(n.list, nfs) +} + +// AddTo mixes all flagsets in the given group to another flagset +func (n *NamedFlagSetGroup) AddTo(cmd *cobra.Command) { + for _, nfs := range n.list { + cmd.Flags().AddFlagSet(nfs.fs) + } +} + +// Usage is for use with (*cobra.Command).SetUsageFunc +func (g *FlagGrouping) Usage(cmd *cobra.Command) error { + if cmd == nil { + return fmt.Errorf("nil command") + } + + group := g.groups[cmd] + + usage := []string{fmt.Sprintf("Usage: %s", cmd.UseLine())} + + if cmd.HasAvailableSubCommands() { + usage = append(usage, "\nCommands:") + for _, subCommand := range cmd.Commands() { + usage = append(usage, fmt.Sprintf(" %-10s %s", subCommand.Name(), subCommand.Short)) + } + } + + if len(cmd.Aliases) > 0 { + usage = append(usage, "\nAliases: " + cmd.NameAndAliases()) + } + + if group != nil { + for _, nfs := range group.list { + usage = append(usage, fmt.Sprintf("\n%s flags:", nfs.name)) + usage = append(usage, strings.TrimRightFunc(nfs.fs.FlagUsages(), unicode.IsSpace)) + } + } + + usage = append(usage, "\nCommon flags:") + if len(cmd.PersistentFlags().FlagUsages()) != 0 { + usage = append(usage, strings.TrimRightFunc(cmd.PersistentFlags().FlagUsages(), unicode.IsSpace)) + } + if len(cmd.InheritedFlags().FlagUsages()) != 0 { + usage = append(usage, strings.TrimRightFunc(cmd.InheritedFlags().FlagUsages(), unicode.IsSpace)) + } + + usage = append(usage, fmt.Sprintf("\nUse '%s [command] --help' for more information about a command.\n", cmd.CommandPath())) + + cmd.Println(strings.Join(usage, "\n")) + + return nil +} + diff --git a/pkg/cmdutils/verb.go b/pkg/cmdutils/verb.go new file mode 100644 index 00000000..0be6d8a9 --- /dev/null +++ b/pkg/cmdutils/verb.go @@ -0,0 +1,55 @@ +package cmdutils + +import ( + "github.com/kris-nova/logger" + "github.com/spf13/cobra" + "os" +) + +// VerbCmd holds attributes that most of the commands use +type VerbCmd struct { + Command *cobra.Command + FlagSetGroup *NamedFlagSetGroup + NameArg string +} + +// AddVerbCmd create a registers a new command under the given resource command +func AddVerbCmd(flagGrouping *FlagGrouping, parentResourceCmd *cobra.Command, newVerbCmd func(*VerbCmd)) { + verb := &VerbCmd { + Command: &cobra.Command{}, + } + verb.FlagSetGroup = flagGrouping.New(verb.Command) + newVerbCmd(verb) + verb.FlagSetGroup.AddTo(verb.Command) + parentResourceCmd.AddCommand(verb.Command) +} + +// SetDescription sets usage along with short and long descriptions as well as aliases +func (vc *VerbCmd) SetDescription(use, short, long string, aliases ...string) { + vc.Command.Use = use + vc.Command.Short = short + vc.Command.Long = long + vc.Command.Aliases = aliases +} + +// SetRunFunc registers a command function +func (vc *VerbCmd) SetRunFunc(cmd func() error) { + vc.Command.Run = func(_ *cobra.Command, _ []string) { + run(cmd) + } +} + +// SetRunFuncWithNameArg registers a command function with an optional name argument +func (vc *VerbCmd) SetRunFuncWithNameArg(cmd func() error) { + vc.Command.Run = func(_ *cobra.Command, args []string) { + vc.NameArg = GetNameArg(args) + run(cmd) + } +} + +func run(cmd func() error) { + if err := cmd(); err != nil { + logger.Critical("%s\n", err.Error()) + os.Exit(1) + } +} diff --git a/pkg/ctl/cluster/cluster.go b/pkg/ctl/cluster/cluster.go new file mode 100644 index 00000000..d5f831e7 --- /dev/null +++ b/pkg/ctl/cluster/cluster.go @@ -0,0 +1,19 @@ +package cluster + +import ( + "github.com/spf13/cobra" + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { + resourceCmd := cmdutils.NewResourceCmd( + "clusters", + "Operations about cluster(s)", + "", + "cluster") + + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, createClusterCmd) + cmdutils.AddVerbCmd(flagGrouping, resourceCmd, listClustersCmd) + + return resourceCmd +} diff --git a/pkg/ctl/cluster/create.go b/pkg/ctl/cluster/create.go new file mode 100644 index 00000000..5ad5675e --- /dev/null +++ b/pkg/ctl/cluster/create.go @@ -0,0 +1,71 @@ +package cluster + +import ( + "github.com/spf13/pflag" + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar" +) + +func createClusterCmd(vc *cmdutils.VerbCmd) { + // update the description + vc.SetDescription( + "add", + "Add a pulsar cluster", + "This command is used for adding the configuration data for a cluster.\n" + + "The configuration data is mainly used for geo-replication between clusters,\n" + + "so please make sure the service urls provided in this command are reachable\n" + + "between clusters.\n" + + "\n" + + "This operation requires Pulsar super-user privileges.", + "create") + + clusterData := &pulsar.ClusterData{} + + // set the run function with name argument + vc.SetRunFuncWithNameArg(func() error { + return doCreateCluster(vc, clusterData) + }) + + // register the params + vc.FlagSetGroup.InFlagSet("ClusterData", func(flagSet *pflag.FlagSet) { + flagSet.StringVar( + &clusterData.ServiceURL, + "url", + "", + "Pulsar cluster web service url, e.g. http://example.pulsar.io:8080") + flagSet.StringVar( + &clusterData.ServiceURLTls, + "url-tls", + "", + "Pulsar cluster tls secured web service url, e.g. https://example.pulsar.io:8443") + flagSet.StringVar( + &clusterData.BrokerServiceURL, + "broker-url", + "", + "Pulsar cluster broker service url, e.g. pulsar://example.pulsar.io:6650") + flagSet.StringVar( + &clusterData.BrokerServiceURLTls, + "broker-url-tls", + "", + "Pulsar cluster tls secured broker service url, e.g. pulsar+ssl://example.pulsar.io:6651") + flagSet.StringArrayVarP( + &clusterData.PeerClusterNames, + "peer-cluster", + "p", + []string{""}, + "Cluster to be registered as a peer-cluster of this cluster.") + }) +} + +func doCreateCluster(vc *cmdutils.VerbCmd, clusterData *pulsar.ClusterData) error { + clusterData.Name = vc.NameArg + + admin := cmdutils.NewPulsarClient() + err := admin.Clusters().Create(*clusterData) + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + vc.Command.Printf("Cluster %s added\n", clusterData.Name) + } + return err +} diff --git a/pkg/ctl/cluster/list.go b/pkg/ctl/cluster/list.go new file mode 100644 index 00000000..cd4ee2f3 --- /dev/null +++ b/pkg/ctl/cluster/list.go @@ -0,0 +1,37 @@ +package cluster + +import ( + "github.com/olekukonko/tablewriter" + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func listClustersCmd(vc *cmdutils.VerbCmd) { + // update the description + vc.SetDescription( + "list", + "List the available pulsar clusters", + "This command is used for listing the list of available pulsar clusters.") + + // set the run function + vc.SetRunFunc(func() error { + return doListClusters(vc) + }) +} + +func doListClusters(vc *cmdutils.VerbCmd) error { + admin := cmdutils.NewPulsarClient() + clusters, err := admin.Clusters().List() + if err != nil { + cmdutils.PrintError(vc.Command.OutOrStderr(), err) + } else { + table := tablewriter.NewWriter(vc.Command.OutOrStdout()) + table.SetHeader([]string{ "Cluster Name" }) + + for _, c := range clusters { + table.Append([]string { c }) + } + + table.Render() + } + return err +} \ No newline at end of file diff --git a/pkg/pulsar/admin.go b/pkg/pulsar/admin.go new file mode 100644 index 00000000..942cd9c3 --- /dev/null +++ b/pkg/pulsar/admin.go @@ -0,0 +1,276 @@ +package pulsar + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" +) + +const ( + DefaultWebServiceURL = "http://localhost:8080" +) + +// Config is used to configure the admin client +type Config struct { + WebServiceUrl string + HttpClient *http.Client +} + +// DefaultConfig returns a default configuration for the pulsar admin client +func DefaultConfig() *Config { + config := &Config{ + WebServiceUrl: DefaultWebServiceURL, + HttpClient: http.DefaultClient, + } + return config +} + +// Client provides a client to the Pulsar Restful API +type Client interface { + Clusters() Clusters +} + +type client struct { + webServiceUrl string + apiVersion string + httpClient *http.Client +} + +// New returns a new client +func New(config *Config) Client { + defConfig := DefaultConfig() + + if len(config.WebServiceUrl) == 0 { + config.WebServiceUrl = defConfig.WebServiceUrl + } + + c := &client{ + // TODO: make api version configurable + apiVersion: "v2", + webServiceUrl: config.WebServiceUrl, + } + + return c +} + +func (c *client) endpoint(componentPath string, parts ...string) string { + return path.Join(makeHttpPath(c.apiVersion, componentPath), endpoint(parts...)) +} + +// get is used to do a GET request against an endpoint +// and deserialize the response into an interface +func (c *client) get(endpoint string, obj interface{}) error { + req, err := c.newRequest(http.MethodGet, endpoint) + if err != nil { + return err + } + + resp, err := checkSuccessful(c.doRequest(req)) + if err != nil { + return err + } + defer safeRespClose(resp) + + if obj != nil { + if err := decodeJsonBody(resp, &obj); err != nil { + return err + } + } + + return nil +} + +func (c *client) put(endpoint string, in, obj interface{}) error { + req, err := c.newRequest(http.MethodPut, endpoint) + if err != nil { + return err + } + req.obj = in + + resp, err := checkSuccessful(c.doRequest(req)) + if err != nil { + return err + } + defer safeRespClose(resp) + + if obj != nil { + if err := decodeJsonBody(resp, &obj); err != nil { + return err + } + } + + return nil +} + +func (c *client) delete(endpoint string, obj interface{}) error { + req, err := c.newRequest(http.MethodDelete, endpoint) + if err != nil { + return err + } + + resp, err := checkSuccessful(c.doRequest(req)) + if err != nil { + return err + } + defer safeRespClose(resp) + + if obj != nil { + if err := decodeJsonBody(resp, &obj); err != nil { + return err + } + } + + return nil +} + +type request struct { + method string + url *url.URL + params url.Values + + obj interface{} + body io.Reader +} + +func (r *request) toHTTP() (*http.Request, error) { + r.url.RawQuery = r.params.Encode() + + // add a request body if there is one + if r.body == nil && r.obj != nil { + body, err := encodeJsonBody(r.obj) + if err != nil { + return nil, err + } + r.body = body + } + + req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) + if err != nil { + return nil, err + } + + req.URL.Host = r.url.Host + req.URL.Scheme = r.url.Scheme + req.Host = r.url.Host + return req, nil +} + + +func (c *client) newRequest(method, path string) (*request, error) { + base, _ := url.Parse(c.webServiceUrl) + u, err := url.Parse(path) + if err != nil { + return nil, err + } + req := &request{ + method: method, + url: &url.URL{ + Scheme: base.Scheme, + User: base.User, + Host: base.Host, + Path: endpoint(base.Path, u.Path), + }, + params: make(url.Values), + } + return req, nil +} + +// TODO: add pulsarctl version +func (c *client) useragent() string { + return fmt.Sprintf("pulsarctl (go)") +} + +func (c *client) doRequest(r *request) (*http.Response, error) { + req, err := r.toHTTP() + if err != nil { + return nil, err + } + + // add default headers + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + req.Header.Set("User-Agent", c.useragent()) + + hc := c.httpClient + if hc == nil { + hc = http.DefaultClient + } + + resp, err := hc.Do(req) + return resp, err +} + + +// decodeJsonBody is used to JSON encode a body +func encodeJsonBody(obj interface{}) (io.Reader, error) { + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + if err := enc.Encode(obj); err != nil { + return nil, err + } + return buf, nil +} + + +// decodeJsonBody is used to JSON decode a body +func decodeJsonBody(resp *http.Response, out interface{}) error { + dec := json.NewDecoder(resp.Body) + return dec.Decode(out) +} + +// safeRespClose is used to close a respone body +func safeRespClose(resp *http.Response) { + if resp != nil { + resp.Body.Close() + } +} + +// responseError is used to parse a response into a pulsar error +func responseError(resp *http.Response) error { + var e Error + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + e.Reason = err.Error() + e.Code = resp.StatusCode + return e + } + + json.Unmarshal(body, &e) + e.Code = resp.StatusCode + + if e.Reason == "" { + e.Reason = unknownErrorReason + } + + return e +} + +// respIsOk is used to validate a successful http status code +func respIsOk(resp *http.Response) bool { + return resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusNoContent +} + +// checkSuccessful checks for a valid response and parses an error +func checkSuccessful(resp *http.Response, err error) (*http.Response, error) { + if err != nil { + safeRespClose(resp) + return nil, err + } + + if !respIsOk(resp) { + defer safeRespClose(resp) + return nil, responseError(resp) + } + + return resp, nil +} + +func endpoint(parts ...string) string { + return path.Join(parts...) +} + diff --git a/pkg/pulsar/cluster.go b/pkg/pulsar/cluster.go new file mode 100644 index 00000000..dd5f67e8 --- /dev/null +++ b/pkg/pulsar/cluster.go @@ -0,0 +1,48 @@ +package pulsar + +// Clusters is used to access the cluster endpoints. + +type Clusters interface { + List() ([]string, error) + Get(string) (ClusterData, error) + Create(ClusterData) error + Delete(string) error +} + +type clusters struct { + client *client + basePath string +} + +func (c *client) Clusters() Clusters { + return &clusters{ + client: c, + basePath:"/clusters", + } +} + +func (c *clusters) List() ([]string, error) { + var clusters []string + err := c.client.get(c.client.endpoint(c.basePath), &clusters) + return clusters, err +} + +func (c *clusters) Get(name string) (ClusterData, error) { + cdata := ClusterData{} + endpoint := c.client.endpoint(c.basePath, name) + err := c.client.get(endpoint, &cdata) + return cdata, err +} + +func (c *clusters) Create(cdata ClusterData) error { + endpoint := c.client.endpoint(c.basePath, cdata.Name) + err := c.client.put(endpoint, &cdata, nil) + return err +} + +func (c *clusters) Delete(name string) error { + endpoint := c.client.endpoint(c.basePath, name) + return c.client.delete(endpoint, nil) +} + + diff --git a/pkg/pulsar/data.go b/pkg/pulsar/data.go new file mode 100644 index 00000000..2ad69c71 --- /dev/null +++ b/pkg/pulsar/data.go @@ -0,0 +1,11 @@ +package pulsar + +// ClusterData information on a cluster +type ClusterData struct { + Name string `json:"-"` + ServiceURL string `json:"serviceUrl"` + ServiceURLTls string `json:"serviceUrlTls"` + BrokerServiceURL string `json:"brokerServiceUrl"` + BrokerServiceURLTls string `json:"brokerServiceUrlTls"` + PeerClusterNames []string `json:"peerClusterNames"` +} diff --git a/pkg/pulsar/errors.go b/pkg/pulsar/errors.go new file mode 100644 index 00000000..19cfd298 --- /dev/null +++ b/pkg/pulsar/errors.go @@ -0,0 +1,19 @@ +package pulsar + +import "fmt" + +const unknownErrorReason = "Unknown pulsar error" + +type Error struct { + Reason string `json:"reason"` + Code int +} + +func (e Error) Error() string { + return fmt.Sprintf("code: %d reason: %s", e.Code, e.Reason) +} + +func IsAdminError(err error) bool { + _, ok := err.(Error) + return ok +} diff --git a/pkg/pulsar/utils.go b/pkg/pulsar/utils.go new file mode 100644 index 00000000..fa62b7c2 --- /dev/null +++ b/pkg/pulsar/utils.go @@ -0,0 +1,7 @@ +package pulsar + +import "fmt" + +func makeHttpPath(apiVersion string, componentPath string) string { + return fmt.Sprintf("/admin/%s%s", apiVersion, componentPath) +} From b46ee577fc056b3a86ffc2a47982e35063dd6426 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 11 Jul 2019 18:05:01 -0700 Subject: [PATCH 2/3] Addressed review comments --- pkg/pulsar/admin.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/pulsar/admin.go b/pkg/pulsar/admin.go index 942cd9c3..a6f5768d 100644 --- a/pkg/pulsar/admin.go +++ b/pkg/pulsar/admin.go @@ -226,7 +226,9 @@ func decodeJsonBody(resp *http.Response, out interface{}) error { // safeRespClose is used to close a respone body func safeRespClose(resp *http.Response) { if resp != nil { - resp.Body.Close() + if err := resp.Body.Close(); err != nil { + // ignore error since it is closing a response body + } } } @@ -240,11 +242,16 @@ func responseError(resp *http.Response) error { return e } - json.Unmarshal(body, &e) - e.Code = resp.StatusCode + jsonErr := json.Unmarshal(body, &e) + + if jsonErr != nil { + e.Code = http.StatusPartialContent + } else { + e.Code = resp.StatusCode - if e.Reason == "" { - e.Reason = unknownErrorReason + if e.Reason == "" { + e.Reason = unknownErrorReason + } } return e From af2314d9c768c978d581815efd24d155aca70382 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Sun, 14 Jul 2019 11:54:24 -0700 Subject: [PATCH 3/3] Add auto-completion to pulsarctl --- main.go | 2 ++ pkg/ctl/completion/completion.go | 62 ++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 pkg/ctl/completion/completion.go diff --git a/main.go b/main.go index 64ff3925..cfbbf79d 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/ctl/cluster" + "github.com/streamnative/pulsarctl/pkg/ctl/completion" "os" ) @@ -59,6 +60,7 @@ func init() { func addCommands(flagGrouping *cmdutils.FlagGrouping) { rootCmd.AddCommand(cluster.Command(flagGrouping)) + rootCmd.AddCommand(completion.Command(rootCmd)) } func main() { diff --git a/pkg/ctl/completion/completion.go b/pkg/ctl/completion/completion.go new file mode 100644 index 00000000..f6f8dac4 --- /dev/null +++ b/pkg/ctl/completion/completion.go @@ -0,0 +1,62 @@ +package completion + +import ( + "github.com/kris-nova/logger" + "github.com/spf13/cobra" + "os" +) + +func Command(rootCmd *cobra.Command) *cobra.Command { + var bashCompletionCmd = &cobra.Command{ + Use: "bash", + Short: "Generates bash completion scripts", + Long: `To load completion run + +. <(pulsarctl completion bash) + +To configure your bash shell to load completions for each session add to your bashrc + +# ~/.bashrc or ~/.profile +. <(pulsarctl completion bash) + +If you are stuck on Bash 3 (macOS) use + +source /dev/stdin <<<"$(pulsarctl completion bash)" +`, + RunE: func(cmd *cobra.Command, args []string) error { + return rootCmd.GenBashCompletion(os.Stdout) + }, + } + + var zshCompletionCmd = &cobra.Command{ + Use: "zsh", + Short: "Generates zsh completion scripts", + Long: `To configure your zsh shell, run: + +mkdir -p ~/.zsh/completion/ +pulsarctl completion zsh > ~/.zsh/completion/_pulsarctl + +and put the following in ~/.zshrc: + +fpath=($fpath ~/.zsh/completion) +`, + RunE: func(cmd *cobra.Command, args []string) error { + return rootCmd.GenZshCompletion(os.Stdout) + }, + } + + cmd := &cobra.Command{ + Use: "completion", + Short: "Generates shell completion scripts", + Run: func(cmd *cobra.Command, args []string) { + if err := cmd.Help(); err != nil { + logger.Debug("ignoring error %q", err.Error()) + } + }, + } + + cmd.AddCommand(bashCompletionCmd) + cmd.AddCommand(zshCompletionCmd) + + return cmd +}