Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 183 additions & 23 deletions cmd/networkdb-test/dbclient/ndbClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbclient

import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
Expand All @@ -25,17 +26,10 @@ type resultTuple struct {
}

func httpGetFatalError(ip, port, path string) {
// for {
body, err := httpGet(ip, port, path)
if err != nil || !strings.Contains(string(body), "OK") {
// if strings.Contains(err.Error(), "EOF") {
// logrus.Warnf("Got EOF path:%s err:%s", path, err)
// continue
// }
log.Fatalf("[%s] error %s %s", path, err, body)
}
// break
// }
}

func httpGet(ip, port, path string) ([]byte, error) {
Expand Down Expand Up @@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/clusterpeers")

if err != nil {
logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)

if err != nil {
logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)

if err != nil {
logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)

if err != nil {
logrus.Errorf("entriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`)
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
body, err := httpGet(ip, port, "/networkstats?nid="+networkName)

if err != nil {
logrus.Errorf("queueLength %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
if doneCh != nil {
Expand All @@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)

if err != nil {
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
doneCh <- resultTuple{id: ip, result: -1}
return
}
Expand All @@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch
doneCh <- resultTuple{id: ip, result: entriesNum}
}

func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
x := 0
for ; x < number; x++ {
k := key + strconv.Itoa(x)
// write key
writeTableKey(ip, port, networkName, tableName, k)
}
doneCh <- resultTuple{id: ip, result: x}
}

func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
x := 0
for ; x < number; x++ {
k := key + strconv.Itoa(x)
// write key
deleteTableKey(ip, port, networkName, tableName, k)
}
doneCh <- resultTuple{id: ip, result: x}
}

func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
for x := 0; ; x++ {
select {
Expand Down Expand Up @@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) {
doneCh <- resultTuple{id: ip, result: 0}
}

func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
startTime := time.Now().UnixNano()
var successTime int64

// Loop for 2 minutes to guartee that the result is stable
// Loop for 2 minutes to guarantee that the result is stable
for {
select {
case <-ctx.Done():
// Validate test success, if the time is set means that all the tables are empty
if successTime != 0 {
logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
opTime = time.Duration(successTime-startTime) / time.Millisecond
logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
return
}
log.Fatal("Test failed, there is still entries in the tables of the nodes")
Expand Down Expand Up @@ -383,6 +424,107 @@ func doNetworkPeers(ips []string, args []string) {
close(doneCh)
}

// network-stats-queue networkName <gt/lt> queueSize
func doNetworkStatsQueue(ips []string, args []string) {
doneCh := make(chan resultTuple, len(ips))
networkName := args[0]
comparison := args[1]
size, _ := strconv.Atoi(args[2])

// check all the nodes
for _, ip := range ips {
go dbQueueLength(ip, servicePort, networkName, doneCh)
}

var avgQueueSize int
// wait for the readiness of all nodes
for i := len(ips); i > 0; i-- {
node := <-doneCh
switch comparison {
case "lt":
if node.result > size {
log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
}
case "gt":
if node.result < size {
log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
}
default:
log.Fatal("unknown comparison operator")
}
avgQueueSize += node.result
}
close(doneCh)
avgQueueSize /= len(ips)
fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
}

// write-keys networkName tableName parallelWriters numberOfKeysEach
func doWriteKeys(ips []string, args []string) {
networkName := args[0]
tableName := args[1]
parallelWriters, _ := strconv.Atoi(args[2])
numberOfKeys, _ := strconv.Atoi(args[3])

doneCh := make(chan resultTuple, parallelWriters)
// Enable watch of tables from clients
for i := 0; i < parallelWriters; i++ {
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
}
waitWriters(parallelWriters, false, doneCh)

// Start parallel writers that will create and delete unique keys
defer close(doneCh)
for i := 0; i < parallelWriters; i++ {
key := "key-" + strconv.Itoa(i) + "-"
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
}

// Sync with all the writers
keyMap := waitWriters(parallelWriters, true, doneCh)
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])

// check table entries for 2 minutes
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
}

// delete-keys networkName tableName parallelWriters numberOfKeysEach
func doDeleteKeys(ips []string, args []string) {
networkName := args[0]
tableName := args[1]
parallelWriters, _ := strconv.Atoi(args[2])
numberOfKeys, _ := strconv.Atoi(args[3])

doneCh := make(chan resultTuple, parallelWriters)
// Enable watch of tables from clients
for i := 0; i < parallelWriters; i++ {
go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
}
waitWriters(parallelWriters, false, doneCh)

// Start parallel writers that will create and delete unique keys
defer close(doneCh)
for i := 0; i < parallelWriters; i++ {
key := "key-" + strconv.Itoa(i) + "-"
logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
}

// Sync with all the writers
keyMap := waitWriters(parallelWriters, true, doneCh)
logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])

// check table entries for 2 minutes
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
}

// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
func doWriteDeleteUniqueKeys(ips []string, args []string) {
networkName := args[0]
Expand Down Expand Up @@ -412,11 +554,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
}

// write-unique-keys networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -449,8 +592,9 @@ func doWriteUniqueKeys(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
}

// write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
Expand All @@ -477,8 +621,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
}

// write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -522,8 +667,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
}

// write-wait-leave networkName tableName numParallelWriters writeTimeSec
Expand Down Expand Up @@ -557,8 +703,9 @@ func doWriteWaitLeave(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
}

// write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
Expand Down Expand Up @@ -606,8 +753,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) {

// check table entries for 2 minutes
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
cancel()
fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
}

var cmdArgChec = map[string]int{
Expand Down Expand Up @@ -666,9 +814,21 @@ func Client(args []string) {
// leave-network networkName
doLeaveNetwork(ips, commandArgs)
case "network-peers":
// network-peers networkName maxRetry
// network-peers networkName expectedNumberPeers maxRetry
doNetworkPeers(ips, commandArgs)

// case "network-stats-entries":
// // network-stats-entries networkName maxRetry
// doNetworkPeers(ips, commandArgs)
case "network-stats-queue":
// network-stats-queue networkName <lt/gt> queueSize
doNetworkStatsQueue(ips, commandArgs)

case "write-keys":
// write-keys networkName tableName parallelWriters numberOfKeysEach
doWriteKeys(ips, commandArgs)
case "delete-keys":
// delete-keys networkName tableName parallelWriters numberOfKeysEach
doDeleteKeys(ips, commandArgs)
case "write-unique-keys":
// write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
doWriteUniqueKeys(ips, commandArgs)
Expand Down
4 changes: 4 additions & 0 deletions cmd/networkdb-test/testMain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
)

func main() {
formatter := &logrus.TextFormatter{
FullTimestamp: true,
}
logrus.SetFormatter(formatter)
logrus.Infof("Starting the image with these args: %v", os.Args)
if len(os.Args) < 1 {
log.Fatal("You need at least 1 argument [client/server]")
Expand Down
10 changes: 10 additions & 0 deletions diagnostic/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,13 @@ type TablePeersResult struct {
TableObj
Elements []PeerEntryObj `json:"entries"`
}

// NetworkStatsResult network db stats related to entries and queue len for a network
type NetworkStatsResult struct {
Entries int `json:"entries"`
QueueLen int `jsoin:"qlen"`
}

func (n *NetworkStatsResult) String() string {
return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen)
}
2 changes: 0 additions & 2 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type tableEventMessage struct {
tname string
key string
msg []byte
node string
}

func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
Expand Down Expand Up @@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
id: nid,
tname: tname,
key: key,
node: nDB.config.NodeID,
})
return nil
}
Loading