Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/go-cmp v0.3.1 // indirect
github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06
github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect
github.com/magiconair/properties v1.8.0
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.1
Expand All @@ -19,4 +20,5 @@ require (
github.com/stretchr/testify v1.3.0
github.com/testcontainers/testcontainers-go v0.0.10
gopkg.in/yaml.v2 v2.2.2
gotest.tools v0.0.0-20181223230014-1083505acf35
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 h1:vN4d3jSss3ExzU
github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06/go.mod h1:++9BgZujZd4v0ZTZCb5iPsaomXdZWyxotIAh1IiDm44=
github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b h1:xYEM2oBUhBEhQjrV+KJ9lEWDWYZoNVZUaBF++Wyljq4=
github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b/go.mod h1:V0HF/ZBlN86HqewcDC/cVxMmYDiRukWjSrgKLUAn9Js=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/bkctl/autorecovery/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func decommissionCmd(vc *cmdutils.VerbCmd) {
var examples []cmdutils.Example
c := cmdutils.Example{
Desc: "Decommission a bookie.",
Command: "pulsarctl bookkeeper auto-recovery (bk-ip:bk-port)",
Command: "pulsarctl bookkeeper auto-recovery decommission (bk-ip:bk-port)",
}
examples = append(examples, c)
desc.CommandExamples = examples
Expand Down
72 changes: 72 additions & 0 deletions pkg/bkctl/autorecovery/lost_bookie_recovery_delay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package autorecovery

import (
"context"
"testing"

"github.com/streamnative/pulsarctl/pkg/test/bookkeeper"

"github.com/stretchr/testify/assert"
)

func TestGetLostBookieRecoveryDelayCmd(t *testing.T) {
// prepare the bookkeeper cluster environment
ctx := context.Background()
bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{
ClusterName: "test-lost-bookie-recovery-delay",
NumBookies: 1,
BookieEnv: map[string]string{
"BK_autoRecoveryDaemonEnabled": "true",
},
})
// nolint
defer bk.Close(ctx)
if err != nil {
t.Fatal(err)
}

err = bk.Start(ctx)
// nolint
defer bk.Stop(ctx)
if err != nil {
t.Fatal(err)
}

httpAddr, err := bk.GetHTTPServiceURL(ctx)
if err != nil {
t.Fatal(err)
}

args := []string{"--bookie-service-url", httpAddr, "get-lost-bookie-recovery-delay"}
out, execErr, nameErr, err := testAutoRecoveryCommands(getLostBookieRecoveryDelayCmd, args)
if err != nil {
t.Fatal(err)
}

if nameErr != nil {
t.Fatal(nameErr)
}

if execErr != nil {
t.Fatal(execErr)
}

assert.Equal(t, "lostBookieRecoveryDelay value: 0\n", out.String())
}
1 change: 1 addition & 0 deletions pkg/bkctl/autorecovery/test_help.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func testAutoRecoveryCommands(newVerb func(cmd *cmdutils.VerbCmd), args []string
buf := new(bytes.Buffer)
rootCmd.SetOut(buf)
rootCmd.SetArgs(append([]string{"auto-recovery"}, args...))
rootCmd.PersistentFlags().AddFlagSet(cmdutils.PulsarCtlConfig.FlagSet())

resourceCmd := cmdutils.NewResourceCmd(
"auto-recovery",
Expand Down
70 changes: 70 additions & 0 deletions pkg/bkctl/autorecovery/trigger_audit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package autorecovery

import (
"context"
"testing"

"github.com/streamnative/pulsarctl/pkg/test/bookkeeper"

"github.com/stretchr/testify/assert"
)

func TestTriggerAuditCmd(t *testing.T) {
// prepare the bookkeeper cluster environment
ctx := context.Background()
bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{
ClusterName: "test-trigger-audit",
NumBookies: 1,
BookieEnv: map[string]string{
"BK_autoRecoveryDaemonEnabled": "true",
},
})
// nolint
defer bk.Close(ctx)
if err != nil {
t.Fatal(err)
}

err = bk.Start(ctx)
// nolint
defer bk.Stop(ctx)
if err != nil {
t.Fatal(err)
}

httpAddr, err := bk.GetHTTPServiceURL(ctx)
if err != nil {
t.Fatal(err)
}

args := []string{"--bookie-service-url", httpAddr, "trigger-audit"}
out, execErr, nameErr, err := testAutoRecoveryCommands(triggerAuditCmd, args)
if err != nil {
t.Fatal(err)
}
if nameErr != nil {
t.Fatal(nameErr)
}
if execErr != nil {
t.Fatal(execErr)
}

assert.Equal(t, "Successfully trigger audit by resetting the lost bookie recovery delay.\n", out.String())
}
2 changes: 1 addition & 1 deletion pkg/bkctl/autorecovery/who_is_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func doWhoIsAuditor(vc *cmdutils.VerbCmd) error {
admin := cmdutils.NewBookieClient()
auditor, err := admin.AutoRecovery().WhoIsAuditor()
if err == nil {
cmdutils.PrintJSON(vc.Command.OutOrStdout(), auditor)
vc.Command.Println(auditor)
}

return err
Expand Down
72 changes: 72 additions & 0 deletions pkg/bkctl/autorecovery/who_is_auditor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package autorecovery

import (
"context"
"testing"

"github.com/streamnative/pulsarctl/pkg/test/bookkeeper"

"github.com/stretchr/testify/assert"
)

func TestWhoIsAuditor(t *testing.T) {
// prepare the bookkeeper cluster environment
ctx := context.Background()
bk, err := bookkeeper.NewBookieCluster(&bookkeeper.ClusterSpec{
ClusterName: "test-who-is-auditor",
NumBookies: 1,
BookieEnv: map[string]string{
"BK_autoRecoveryDaemonEnabled": "true",
},
})
// nolint
defer bk.Close(ctx)
if err != nil {
t.Fatal(err)
}

err = bk.Start(ctx)
// nolint
defer bk.Stop(ctx)
if err != nil {
t.Fatal(err)
}

allBK := bk.GetAllBookieContainerID()

httpAddr, err := bk.GetHTTPServiceURL(ctx)
if err != nil {
t.Fatal(err)
}

args := []string{"--bookie-service-url", httpAddr, "who-is-auditor"}
out, execErr, nameErr, err := testAutoRecoveryCommands(whoIsAuditorCmd, args)
if err != nil {
t.Fatal(err)
}
if nameErr != nil {
t.Fatal(nameErr)
}
if execErr != nil {
t.Fatal(execErr)
}

assert.Contains(t, out.String(), allBK[0][:12])
}
8 changes: 4 additions & 4 deletions pkg/bookkeeper/autorecovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type AutoRecovery interface {
PrintListUnderReplicatedLedger(string, string) (map[int64][]string, error)

// WhoIsAuditor is used to getting which bookie is the auditor
WhoIsAuditor() (map[string]string, error)
WhoIsAuditor() (string, error)

// TriggerAudit is used to triggering audit by resetting the lostBookieRecoveryDelay
TriggerAudit() error
Expand Down Expand Up @@ -90,10 +90,10 @@ func (a *autoRecovery) PrintListUnderReplicatedLedger(missingReplica,
return resp, err
}

func (a *autoRecovery) WhoIsAuditor() (map[string]string, error) {
func (a *autoRecovery) WhoIsAuditor() (string, error) {
endpoint := a.bk.endpoint(a.basePath, "/who_is_auditor")
resp := make(map[string]string)
return resp, a.bk.Client.Get(endpoint, &resp)
resp, err := a.bk.Client.GetWithQueryParams(endpoint, nil, nil, false)
return string(resp), err
}

func (a *autoRecovery) TriggerAudit() error {
Expand Down
55 changes: 55 additions & 0 deletions pkg/test/bookkeeper/bkctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package bookkeeper

import (
"fmt"
"strings"

"github.com/pkg/errors"
"github.com/streamnative/pulsarctl/pkg/test"
)

const bkctl = "/opt/bookkeeper/bin/bkctl"

func ListBookies(containerID string) ([]string, error) {
bookies := make([]string, 0)
bookieStr, err := test.ExecCmd(containerID, []string{bkctl, "bookies", "list"})
if err != nil {
return nil, err
}
fmt.Println(bookieStr)
lines := strings.Split(bookieStr, "\n")
for _, v := range lines {
if strings.TrimSpace(v) == "ReadWrite Bookies :" ||
strings.TrimSpace(v) == "All Bookies :" ||
strings.TrimSpace(v) == "" {
continue
}
bookieAddr := strings.Split(v, ":")
if len(bookieAddr) != 2 {
return nil, errors.Errorf("get bookies address '%s' encountered unexpected error", v)
}
bookieIP := strings.Split(bookieAddr[0], "(")
if len(bookieIP) != 2 {
return nil, errors.Errorf("get bookies address '%s' encountered unexpected error", v)
}
bookies = append(bookies, fmt.Sprintf("%s:%s", bookieIP[0], bookieAddr[1]))
}
return bookies, nil
}
19 changes: 16 additions & 3 deletions pkg/test/bookkeeper/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,25 @@ import (
)

var (
LatestImage = "apache/bookkeeper:latest"
LatestImage = "apache/bookkeeper:latest"
BookKeeper4_10_0 = "apache/bookkeeper:4.10.0"
)

type ClusterDef struct {
test.Cluster
clusterSpec *ClusterSpec
networkName string
network testcontainers.Network
zkContainer *test.BaseContainer
bookieContainers map[string]*test.BaseContainer
}

func DefaultCluster() (test.Cluster, error) {
func DefaultCluster() (*ClusterDef, error) {
return NewBookieCluster(DefaultClusterSpec())
}

func NewBookieCluster(spec *ClusterSpec) (test.Cluster, error) {
func NewBookieCluster(spec *ClusterSpec) (*ClusterDef, error) {
spec = GetClusterSpec(spec)
c := &ClusterDef{clusterSpec: spec}
c.networkName = spec.ClusterName + test.RandomSuffix()
network, err := test.NewNetwork(c.networkName)
Expand All @@ -69,10 +72,12 @@ func getBookieContainers(c *ClusterSpec, networkName, zkServers string) map[stri
"BK_zkServers": zkServers,
"BK_httpServerEnabled": "true",
"BK_httpServerPort": strconv.Itoa(c.BookieHTTPServicePort),
"BK_httpServerClass": "org.apache.bookkeeper.http.vertx.VertxHttpServer",
"BK_ledgerDirectories": "bk/ledgers",
"BK_indexDirectories": "bk/ledgers",
"BK_journalDirectory": "bk/journal",
})
bookie.WithEnv(c.BookieEnv)
bookies[name] = bookie
}
return bookies
Expand Down Expand Up @@ -140,3 +145,11 @@ func (c *ClusterDef) getABookie() *test.BaseContainer {
}
return nil
}

func (c *ClusterDef) GetAllBookieContainerID() []string {
containerIDs := make([]string, 0)
for _, v := range c.bookieContainers {
containerIDs = append(containerIDs, v.GetContainerID())
}
return containerIDs
}
Loading