diff --git a/go.mod b/go.mod index b9840863..13fc77cb 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/google/go-cmp v0.3.1 // indirect github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect + github.com/magiconair/properties v1.8.0 github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-runewidth v0.0.4 // indirect github.com/olekukonko/tablewriter v0.0.1 @@ -19,4 +20,5 @@ require ( github.com/stretchr/testify v1.3.0 github.com/testcontainers/testcontainers-go v0.0.10 gopkg.in/yaml.v2 v2.2.2 + gotest.tools v0.0.0-20181223230014-1083505acf35 ) diff --git a/go.sum b/go.sum index f7eda5a0..614a7ce3 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,7 @@ github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 h1:vN4d3jSss3ExzU github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06/go.mod h1:++9BgZujZd4v0ZTZCb5iPsaomXdZWyxotIAh1IiDm44= github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBEhQjrV+KJ9lEWDWYZoNVZUaBF++Wyljq4= github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= diff --git a/pkg/bkctl/autorecovery/decommission.go b/pkg/bkctl/autorecovery/decommission.go index e1b33dd4..51785204 100644 --- a/pkg/bkctl/autorecovery/decommission.go +++ b/pkg/bkctl/autorecovery/decommission.go @@ -29,7 +29,7 @@ func decommissionCmd(vc *cmdutils.VerbCmd) { var examples []cmdutils.Example c := cmdutils.Example{ Desc: "Decommission a bookie.", - Command: "pulsarctl bookkeeper auto-recovery (bk-ip:bk-port)", + Command: "pulsarctl bookkeeper auto-recovery decommission (bk-ip:bk-port)", } examples = append(examples, c) desc.CommandExamples = examples diff --git a/pkg/bkctl/autorecovery/lost_bookie_recovery_delay_test.go b/pkg/bkctl/autorecovery/lost_bookie_recovery_delay_test.go new file mode 100644 index 00000000..49048bce --- /dev/null +++ b/pkg/bkctl/autorecovery/lost_bookie_recovery_delay_test.go @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package autorecovery + +import ( + "context" + "testing" + + "github.com/streamnative/pulsarctl/pkg/test/bookkeeper" + + "github.com/stretchr/testify/assert" +) + +func TestGetLostBookieRecoveryDelayCmd(t *testing.T) { + // prepare the bookkeeper cluster environment + ctx := context.Background() + bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{ + ClusterName: "test-lost-bookie-recovery-delay", + NumBookies: 1, + BookieEnv: map[string]string{ + "BK_autoRecoveryDaemonEnabled": "true", + }, + }) + // nolint + defer bk.Close(ctx) + if err != nil { + t.Fatal(err) + } + + err = bk.Start(ctx) + // nolint + defer bk.Stop(ctx) + if err != nil { + t.Fatal(err) + } + + httpAddr, err := bk.GetHTTPServiceURL(ctx) + if err != nil { + t.Fatal(err) + } + + args := []string{"--bookie-service-url", httpAddr, "get-lost-bookie-recovery-delay"} + out, execErr, nameErr, err := testAutoRecoveryCommands(getLostBookieRecoveryDelayCmd, args) + if err != nil { + t.Fatal(err) + } + + if nameErr != nil { + t.Fatal(nameErr) + } + + if execErr != nil { + t.Fatal(execErr) + } + + assert.Equal(t, "lostBookieRecoveryDelay value: 0\n", out.String()) +} diff --git a/pkg/bkctl/autorecovery/test_help.go b/pkg/bkctl/autorecovery/test_help.go index 900286b5..e34de7ff 100644 --- a/pkg/bkctl/autorecovery/test_help.go +++ b/pkg/bkctl/autorecovery/test_help.go @@ -43,6 +43,7 @@ func testAutoRecoveryCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string buf := new(bytes.Buffer) rootCmd.SetOut(buf) rootCmd.SetArgs(append([]string{"auto-recovery"}, args...)) + rootCmd.PersistentFlags().AddFlagSet(cmdutils.PulsarCtlConfig.FlagSet()) resourceCmd := cmdutils.NewResourceCmd( "auto-recovery", diff --git a/pkg/bkctl/autorecovery/trigger_audit_test.go b/pkg/bkctl/autorecovery/trigger_audit_test.go new file mode 100644 index 00000000..9cbe021a --- /dev/null +++ b/pkg/bkctl/autorecovery/trigger_audit_test.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package autorecovery + +import ( + "context" + "testing" + + "github.com/streamnative/pulsarctl/pkg/test/bookkeeper" + + "github.com/stretchr/testify/assert" +) + +func TestTriggerAuditCmd(t *testing.T) { + // prepare the bookkeeper cluster environment + ctx := context.Background() + bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{ + ClusterName: "test-trigger-audit", + NumBookies: 1, + BookieEnv: map[string]string{ + "BK_autoRecoveryDaemonEnabled": "true", + }, + }) + // nolint + defer bk.Close(ctx) + if err != nil { + t.Fatal(err) + } + + err = bk.Start(ctx) + // nolint + defer bk.Stop(ctx) + if err != nil { + t.Fatal(err) + } + + httpAddr, err := bk.GetHTTPServiceURL(ctx) + if err != nil { + t.Fatal(err) + } + + args := []string{"--bookie-service-url", httpAddr, "trigger-audit"} + out, execErr, nameErr, err := testAutoRecoveryCommands(triggerAuditCmd, args) + if err != nil { + t.Fatal(err) + } + if nameErr != nil { + t.Fatal(nameErr) + } + if execErr != nil { + t.Fatal(execErr) + } + + assert.Equal(t, "Successfully trigger audit by resetting the lost bookie recovery delay.\n", out.String()) +} diff --git a/pkg/bkctl/autorecovery/who_is_auditor.go b/pkg/bkctl/autorecovery/who_is_auditor.go index 0693bf14..deffe907 100644 --- a/pkg/bkctl/autorecovery/who_is_auditor.go +++ b/pkg/bkctl/autorecovery/who_is_auditor.go @@ -59,7 +59,7 @@ func doWhoIsAuditor(vc *cmdutils.VerbCmd) error { admin := cmdutils.NewBookieClient() auditor, err := admin.AutoRecovery().WhoIsAuditor() if err == nil { - cmdutils.PrintJSON(vc.Command.OutOrStdout(), auditor) + vc.Command.Println(auditor) } return err diff --git a/pkg/bkctl/autorecovery/who_is_auditor_test.go b/pkg/bkctl/autorecovery/who_is_auditor_test.go new file mode 100644 index 00000000..f4395086 --- /dev/null +++ b/pkg/bkctl/autorecovery/who_is_auditor_test.go @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package autorecovery + +import ( + "context" + "testing" + + "github.com/streamnative/pulsarctl/pkg/test/bookkeeper" + + "github.com/stretchr/testify/assert" +) + +func TestWhoIsAuditor(t *testing.T) { + // prepare the bookkeeper cluster environment + ctx := context.Background() + bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{ + ClusterName: "test-who-is-auditor", + NumBookies: 1, + BookieEnv: map[string]string{ + "BK_autoRecoveryDaemonEnabled": "true", + }, + }) + // nolint + defer bk.Close(ctx) + if err != nil { + t.Fatal(err) + } + + err = bk.Start(ctx) + // nolint + defer bk.Stop(ctx) + if err != nil { + t.Fatal(err) + } + + allBK := bk.GetAllBookieContainerID() + + httpAddr, err := bk.GetHTTPServiceURL(ctx) + if err != nil { + t.Fatal(err) + } + + args := []string{"--bookie-service-url", httpAddr, "who-is-auditor"} + out, execErr, nameErr, err := testAutoRecoveryCommands(whoIsAuditorCmd, args) + if err != nil { + t.Fatal(err) + } + if nameErr != nil { + t.Fatal(nameErr) + } + if execErr != nil { + t.Fatal(execErr) + } + + assert.Contains(t, out.String(), allBK[0][:12]) +} diff --git a/pkg/bookkeeper/autorecovery.go b/pkg/bookkeeper/autorecovery.go index f1d2e5f1..3e1d14bf 100644 --- a/pkg/bookkeeper/autorecovery.go +++ b/pkg/bookkeeper/autorecovery.go @@ -31,7 +31,7 @@ type AutoRecovery interface { PrintListUnderReplicatedLedger(string, string) (map[int64][]string, error) // WhoIsAuditor is used to getting which bookie is the auditor - WhoIsAuditor() (map[string]string, error) + WhoIsAuditor() (string, error) // TriggerAudit is used to triggering audit by resetting the lostBookieRecoveryDelay TriggerAudit() error @@ -90,10 +90,10 @@ func (a *autoRecovery) PrintListUnderReplicatedLedger(missingReplica, return resp, err } -func (a *autoRecovery) WhoIsAuditor() (map[string]string, error) { +func (a *autoRecovery) WhoIsAuditor() (string, error) { endpoint := a.bk.endpoint(a.basePath, "/who_is_auditor") - resp := make(map[string]string) - return resp, a.bk.Client.Get(endpoint, &resp) + resp, err := a.bk.Client.GetWithQueryParams(endpoint, nil, nil, false) + return string(resp), err } func (a *autoRecovery) TriggerAudit() error { diff --git a/pkg/test/bookkeeper/bkctl.go b/pkg/test/bookkeeper/bkctl.go new file mode 100644 index 00000000..31ade9d2 --- /dev/null +++ b/pkg/test/bookkeeper/bkctl.go @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package bookkeeper + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/streamnative/pulsarctl/pkg/test" +) + +const bkctl = "/opt/bookkeeper/bin/bkctl" + +func ListBookies(containerID string) ([]string, error) { + bookies := make([]string, 0) + bookieStr, err := test.ExecCmd(containerID, []string{bkctl, "bookies", "list"}) + if err != nil { + return nil, err + } + fmt.Println(bookieStr) + lines := strings.Split(bookieStr, "\n") + for _, v := range lines { + if strings.TrimSpace(v) == "ReadWrite Bookies :" || + strings.TrimSpace(v) == "All Bookies :" || + strings.TrimSpace(v) == "" { + continue + } + bookieAddr := strings.Split(v, ":") + if len(bookieAddr) != 2 { + return nil, errors.Errorf("get bookies address '%s' encountered unexpected error", v) + } + bookieIP := strings.Split(bookieAddr[0], "(") + if len(bookieIP) != 2 { + return nil, errors.Errorf("get bookies address '%s' encountered unexpected error", v) + } + bookies = append(bookies, fmt.Sprintf("%s:%s", bookieIP[0], bookieAddr[1])) + } + return bookies, nil +} diff --git a/pkg/test/bookkeeper/cluster.go b/pkg/test/bookkeeper/cluster.go index 493eae9c..c7deafec 100644 --- a/pkg/test/bookkeeper/cluster.go +++ b/pkg/test/bookkeeper/cluster.go @@ -30,10 +30,12 @@ import ( ) var ( - LatestImage = "apache/bookkeeper:latest" + LatestImage = "apache/bookkeeper:latest" + BookKeeper4_10_0 = "apache/bookkeeper:4.10.0" ) type ClusterDef struct { + test.Cluster clusterSpec *ClusterSpec networkName string network testcontainers.Network @@ -41,11 +43,12 @@ type ClusterDef struct { bookieContainers map[string]*test.BaseContainer } -func DefaultCluster() (test.Cluster, error) { +func DefaultCluster() (*ClusterDef, error) { return NewBookieCluster(DefaultClusterSpec()) } -func NewBookieCluster(spec *ClusterSpec) (test.Cluster, error) { +func NewBookieCluster(spec *ClusterSpec) (*ClusterDef, error) { + spec = GetClusterSpec(spec) c := &ClusterDef{clusterSpec: spec} c.networkName = spec.ClusterName + test.RandomSuffix() network, err := test.NewNetwork(c.networkName) @@ -69,10 +72,12 @@ func getBookieContainers(c *ClusterSpec, networkName, zkServers string) map[stri "BK_zkServers": zkServers, "BK_httpServerEnabled": "true", "BK_httpServerPort": strconv.Itoa(c.BookieHTTPServicePort), + "BK_httpServerClass": "org.apache.bookkeeper.http.vertx.VertxHttpServer", "BK_ledgerDirectories": "bk/ledgers", "BK_indexDirectories": "bk/ledgers", "BK_journalDirectory": "bk/journal", }) + bookie.WithEnv(c.BookieEnv) bookies[name] = bookie } return bookies @@ -140,3 +145,11 @@ func (c *ClusterDef) getABookie() *test.BaseContainer { } return nil } + +func (c *ClusterDef) GetAllBookieContainerID() []string { + containerIDs := make([]string, 0) + for _, v := range c.bookieContainers { + containerIDs = append(containerIDs, v.GetContainerID()) + } + return containerIDs +} diff --git a/pkg/test/bookkeeper/cluster_spec.go b/pkg/test/bookkeeper/cluster_spec.go index 284a1762..5a228b4f 100644 --- a/pkg/test/bookkeeper/cluster_spec.go +++ b/pkg/test/bookkeeper/cluster_spec.go @@ -17,7 +17,11 @@ package bookkeeper -import "github.com/streamnative/pulsarctl/pkg/test/bookkeeper/containers" +import ( + "strings" + + "github.com/streamnative/pulsarctl/pkg/test/bookkeeper/containers" +) type ClusterSpec struct { Image string @@ -26,11 +30,12 @@ type ClusterSpec struct { BookieServicePort int BookieHTTPServicePort int ZookeeperServicePort int + BookieEnv map[string]string } func DefaultClusterSpec() *ClusterSpec { return &ClusterSpec{ - Image: LatestImage, + Image: BookKeeper4_10_0, ClusterName: "default-bookie", NumBookies: 1, BookieServicePort: containers.DefaultBookieServicePort, @@ -38,3 +43,37 @@ func DefaultClusterSpec() *ClusterSpec { ZookeeperServicePort: containers.DefaultZookeeperServicePort, } } + +func GetClusterSpec(spec *ClusterSpec) *ClusterSpec { + newSpec := DefaultClusterSpec() + + if spec.NumBookies > 0 { + newSpec.NumBookies = spec.NumBookies + } + + if strings.TrimSpace(spec.ClusterName) != "" { + newSpec.ClusterName = spec.ClusterName + } + + if strings.TrimSpace(spec.Image) != "" { + newSpec.Image = spec.Image + } + + if spec.ZookeeperServicePort > 0 { + newSpec.ZookeeperServicePort = spec.ZookeeperServicePort + } + + if spec.BookieServicePort > 0 { + newSpec.BookieServicePort = spec.BookieServicePort + } + + if spec.BookieHTTPServicePort > 0 { + newSpec.BookieHTTPServicePort = spec.BookieHTTPServicePort + } + + if spec.BookieEnv != nil { + newSpec.BookieEnv = spec.BookieEnv + } + + return newSpec +} diff --git a/pkg/test/utils.go b/pkg/test/utils.go index af2d1864..35e6ed1a 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -19,6 +19,7 @@ package test import ( "context" + "os/exec" "strconv" "time" @@ -43,3 +44,10 @@ func NewNetwork(name string) (testcontainers.Network, error) { func RandomSuffix() string { return "-" + strconv.FormatInt(time.Now().Unix(), 10) } + +func ExecCmd(containerID string, cmd []string) (string, error) { + args := []string{"exec", containerID} + args = append(args, cmd...) + out, err := exec.Command("docker", args...).Output() + return string(out), err +}