From 85b74eebcb6bb76c3ae90343c801c66765de890d Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Tue, 26 Jun 2018 21:20:57 -0700 Subject: [PATCH 1/2] Enhance testing infra Allow to write and delete X number of entries Allow to query the queue length Signed-off-by: Flavio Crisciani (cherry picked from commit dd5a9eaba3974e06f7b678c6cc02c58d2e18dd30) Signed-off-by: Sebastiaan van Stijn --- cmd/networkdb-test/dbclient/ndbClient.go | 206 ++++++++++++++++++++--- cmd/networkdb-test/testMain.go | 4 + diagnostic/types.go | 10 ++ networkdb/networkdbdiagnostic.go | 39 +++++ 4 files changed, 236 insertions(+), 23 deletions(-) diff --git a/cmd/networkdb-test/dbclient/ndbClient.go b/cmd/networkdb-test/dbclient/ndbClient.go index e2574fc3cd..1b05e714c0 100644 --- a/cmd/networkdb-test/dbclient/ndbClient.go +++ b/cmd/networkdb-test/dbclient/ndbClient.go @@ -2,6 +2,7 @@ package dbclient import ( "context" + "fmt" "io/ioutil" "log" "net" @@ -25,17 +26,10 @@ type resultTuple struct { } func httpGetFatalError(ip, port, path string) { - // for { body, err := httpGet(ip, port, path) if err != nil || !strings.Contains(string(body), "OK") { - // if strings.Contains(err.Error(), "EOF") { - // logrus.Warnf("Got EOF path:%s err:%s", path, err) - // continue - // } log.Fatalf("[%s] error %s %s", path, err, body) } - // break - // } } func httpGet(ip, port, path string) ([]byte, error) { @@ -87,7 +81,7 @@ func clusterPeersNumber(ip, port string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/clusterpeers") if err != nil { - logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err) + logrus.Errorf("clusterPeers %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -101,7 +95,7 @@ func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) { body, err := httpGet(ip, port, "/networkpeers?nid="+networkName) if err != nil { - logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -115,7 +109,7 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -124,6 +118,32 @@ func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan r doneCh <- resultTuple{id: ip, result: entriesNum} } +func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("entriesNumber %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + +func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) { + body, err := httpGet(ip, port, "/networkstats?nid="+networkName) + + if err != nil { + logrus.Errorf("queueLength %s there was an error: %s", ip, err) + doneCh <- resultTuple{id: ip, result: -1} + return + } + elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`) + entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1]) + doneCh <- resultTuple{id: ip, result: entriesNum} +} + func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) { httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName) if doneCh != nil { @@ -135,7 +155,7 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName) if err != nil { - logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err) + logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err) doneCh <- resultTuple{id: ip, result: -1} return } @@ -144,6 +164,26 @@ func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh ch doneCh <- resultTuple{id: ip, result: entriesNum} } +func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + writeTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + +func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) { + x := 0 + for ; x < number; x++ { + k := key + strconv.Itoa(x) + // write key + deleteTableKey(ip, port, networkName, tableName, k) + } + doneCh <- resultTuple{id: ip, result: x} +} + func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) { for x := 0; ; x++ { select { @@ -215,17 +255,18 @@ func ready(ip, port string, doneCh chan resultTuple) { doneCh <- resultTuple{id: ip, result: 0} } -func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) { +func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) { startTime := time.Now().UnixNano() var successTime int64 - // Loop for 2 minutes to guartee that the result is stable + // Loop for 2 minutes to guarantee that the result is stable for { select { case <-ctx.Done(): // Validate test success, if the time is set means that all the tables are empty if successTime != 0 { - logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond) + opTime = time.Duration(successTime-startTime) / time.Millisecond + logrus.Infof("Check table passed, the cluster converged in %d msec", opTime) return } log.Fatal("Test failed, there is still entries in the tables of the nodes") @@ -383,6 +424,107 @@ func doNetworkPeers(ips []string, args []string) { close(doneCh) } +// network-stats-queue networkName queueSize +func doNetworkStatsQueue(ips []string, args []string) { + doneCh := make(chan resultTuple, len(ips)) + networkName := args[0] + comparison := args[1] + size, _ := strconv.Atoi(args[2]) + + // check all the nodes + for _, ip := range ips { + go dbQueueLength(ip, servicePort, networkName, doneCh) + } + + var avgQueueSize int + // wait for the readiness of all nodes + for i := len(ips); i > 0; i-- { + node := <-doneCh + switch comparison { + case "lt": + if node.result > size { + log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size) + } + case "gt": + if node.result < size { + log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size) + } + default: + log.Fatal("unknown comparison operator") + } + avgQueueSize += node.result + } + close(doneCh) + avgQueueSize /= len(ips) + fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize) +} + +// write-keys networkName tableName parallelWriters numberOfKeysEach +func doWriteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime) +} + +// delete-keys networkName tableName parallelWriters numberOfKeysEach +func doDeleteKeys(ips []string, args []string) { + networkName := args[0] + tableName := args[1] + parallelWriters, _ := strconv.Atoi(args[2]) + numberOfKeys, _ := strconv.Atoi(args[3]) + + doneCh := make(chan resultTuple, parallelWriters) + // Enable watch of tables from clients + for i := 0; i < parallelWriters; i++ { + go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh) + } + waitWriters(parallelWriters, false, doneCh) + + // Start parallel writers that will create and delete unique keys + defer close(doneCh) + for i := 0; i < parallelWriters; i++ { + key := "key-" + strconv.Itoa(i) + "-" + logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i]) + go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh) + } + + // Sync with all the writers + keyMap := waitWriters(parallelWriters, true, doneCh) + logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys]) + + // check table entries for 2 minutes + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + cancel() + fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime) +} + // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec func doWriteDeleteUniqueKeys(ips []string, args []string) { networkName := args[0] @@ -412,11 +554,12 @@ func doWriteDeleteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) + opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime) } // write-unique-keys networkName tableName numParallelWriters writeTimeSec @@ -449,8 +592,9 @@ func doWriteUniqueKeys(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime) } // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec @@ -477,8 +621,9 @@ func doWriteDeleteLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime) } // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec @@ -522,8 +667,9 @@ func doWriteDeleteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave networkName tableName numParallelWriters writeTimeSec @@ -557,8 +703,9 @@ func doWriteWaitLeave(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime) } // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver @@ -606,8 +753,9 @@ func doWriteWaitLeaveJoin(ips []string, args []string) { // check table entries for 2 minutes ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute) - checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) + opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber) cancel() + fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime) } var cmdArgChec = map[string]int{ @@ -666,9 +814,21 @@ func Client(args []string) { // leave-network networkName doLeaveNetwork(ips, commandArgs) case "network-peers": - // network-peers networkName maxRetry + // network-peers networkName expectedNumberPeers maxRetry doNetworkPeers(ips, commandArgs) - + // case "network-stats-entries": + // // network-stats-entries networkName maxRetry + // doNetworkPeers(ips, commandArgs) + case "network-stats-queue": + // network-stats-queue networkName queueSize + doNetworkStatsQueue(ips, commandArgs) + + case "write-keys": + // write-keys networkName tableName parallelWriters numberOfKeysEach + doWriteKeys(ips, commandArgs) + case "delete-keys": + // delete-keys networkName tableName parallelWriters numberOfKeysEach + doDeleteKeys(ips, commandArgs) case "write-unique-keys": // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec doWriteUniqueKeys(ips, commandArgs) diff --git a/cmd/networkdb-test/testMain.go b/cmd/networkdb-test/testMain.go index 0cd8c29942..76cc406af1 100644 --- a/cmd/networkdb-test/testMain.go +++ b/cmd/networkdb-test/testMain.go @@ -10,6 +10,10 @@ import ( ) func main() { + formatter := &logrus.TextFormatter{ + FullTimestamp: true, + } + logrus.SetFormatter(formatter) logrus.Infof("Starting the image with these args: %v", os.Args) if len(os.Args) < 1 { log.Fatal("You need at least 1 argument [client/server]") diff --git a/diagnostic/types.go b/diagnostic/types.go index 4eb4ca0d9f..e6b4831263 100644 --- a/diagnostic/types.go +++ b/diagnostic/types.go @@ -120,3 +120,13 @@ type TablePeersResult struct { TableObj Elements []PeerEntryObj `json:"entries"` } + +// NetworkStatsResult network db stats related to entries and queue len for a network +type NetworkStatsResult struct { + Entries int `json:"entries"` + QueueLen int `jsoin:"qlen"` +} + +func (n *NetworkStatsResult) String() string { + return fmt.Sprintf("entries: %d, qlen: %d\n", n.Entries, n.QueueLen) +} diff --git a/networkdb/networkdbdiagnostic.go b/networkdb/networkdbdiagnostic.go index ffeb98d607..a0e9598799 100644 --- a/networkdb/networkdbdiagnostic.go +++ b/networkdb/networkdbdiagnostic.go @@ -28,6 +28,7 @@ var NetDbPaths2Func = map[string]diagnostic.HTTPHandlerFunc{ "/deleteentry": dbDeleteEntry, "/getentry": dbGetEntry, "/gettable": dbGetTable, + "/networkstats": dbNetworkStats, } func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) { @@ -411,3 +412,41 @@ func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { } diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) } + +func dbNetworkStats(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnostic.DebugHTTPForm(r) + _, json := diagnostic.ParseHTTPFormOptions(r) + + // audit logs + log := logrus.WithFields(logrus.Fields{"component": "diagnostic", "remoteIP": r.RemoteAddr, "method": common.CallerName(0), "url": r.URL.String()}) + log.Info("network stats") + + if len(r.Form["nid"]) < 1 { + rsp := diagnostic.WrongCommand(missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path)) + log.Error("network stats failed, wrong input") + diagnostic.HTTPReply(w, rsp, json) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + nDB.RLock() + networks := nDB.networks[nDB.config.NodeID] + network, ok := networks[r.Form["nid"][0]] + + entries := -1 + qLen := -1 + if ok { + entries = network.entriesNumber + qLen = network.tableBroadcasts.NumQueued() + } + nDB.RUnlock() + + rsp := diagnostic.CommandSucceed(&diagnostic.NetworkStatsResult{Entries: entries, QueueLen: qLen}) + log.WithField("response", fmt.Sprintf("%+v", rsp)).Info("network stats done") + diagnostic.HTTPReply(w, rsp, json) + return + } + diagnostic.HTTPReply(w, diagnostic.FailCommand(fmt.Errorf("%s", dbNotAvailable)), json) +} From 07ba6c3d9ca44312810c88121123c504f4f12744 Mon Sep 17 00:00:00 2001 From: Flavio Crisciani Date: Mon, 2 Jul 2018 16:36:19 -0700 Subject: [PATCH 2/2] Optimize networkDB queue Added some optimizations to reduce the messages in the queue: 1) on join network the node execute a tcp sync with all the nodes that it is aware part of the specific network. During this time before the node was redistributing all the entries. This meant that if the network had 10K entries the queue of the joining node will jump to 10K. The fix adds a flag on the network that would avoid to insert any entry in the queue till the sync happens. Note that right now the flag is set in a best effort way, there is no real check if at least one of the nodes succeed. 2) limit the number of messages to redistribute coming from a TCP sync. Introduced a threshold that limit the number of messages that are propagated, this will disable this optimization in case of heavy load. Signed-off-by: Flavio Crisciani (cherry picked from commit 5ed38221164e04c78d20f17839aab52fafd7fe88) Signed-off-by: Sebastiaan van Stijn --- networkdb/broadcast.go | 2 -- networkdb/cluster.go | 22 +++++++++++++++------- networkdb/delegate.go | 34 ++++++++++++++++++++++++++-------- networkdb/networkdb.go | 10 ++++++++++ 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/networkdb/broadcast.go b/networkdb/broadcast.go index 174023b22b..efcfcc2426 100644 --- a/networkdb/broadcast.go +++ b/networkdb/broadcast.go @@ -110,7 +110,6 @@ type tableEventMessage struct { tname string key string msg []byte - node string } func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool { @@ -168,7 +167,6 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st id: nid, tname: tname, key: key, - node: nDB.config.NodeID, }) return nil } diff --git a/networkdb/cluster.go b/networkdb/cluster.go index bd48fb9f18..2584dbcb2d 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -24,6 +24,9 @@ const ( retryInterval = 1 * time.Second nodeReapInterval = 24 * time.Hour nodeReapPeriod = 2 * time.Hour + // considering a cluster with > 20 nodes and a drain speed of 100 msg/s + // the following is roughly 1 minute + maxQueueLenBroadcastOnSync = 500 ) type logWriter struct{} @@ -555,6 +558,7 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { var err error var networks []string + var success bool for _, node := range nodes { if node == nDB.config.NodeID { continue @@ -562,21 +566,25 @@ func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { logrus.Debugf("%v(%v): Initiating bulk sync with node %v", nDB.config.Hostname, nDB.config.NodeID, node) networks = nDB.findCommonNetworks(node) err = nDB.bulkSyncNode(networks, node, true) - // if its periodic bulksync stop after the first successful sync - if !all && err == nil { - break - } if err != nil { err = fmt.Errorf("bulk sync to node %s failed: %v", node, err) logrus.Warn(err.Error()) + } else { + // bulk sync succeeded + success = true + // if its periodic bulksync stop after the first successful sync + if !all { + break + } } } - if err != nil { - return nil, err + if success { + // if at least one node sync succeeded + return networks, nil } - return networks, nil + return nil, err } // Bulk sync all the table entries belonging to a set of networks to a diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 6cd827ee26..14e19bbdd7 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -142,7 +142,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { return true } -func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { +func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent, isBulkSync bool) bool { // Update our local clock if the received messages has newer time. nDB.tableClock.Witness(tEvent.LTime) @@ -175,6 +175,14 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.Unlock() return false } + } else if tEvent.Type == TableEventTypeDelete && !isBulkSync { + nDB.Unlock() + // We don't know the entry, the entry is being deleted and the message is an async message + // In this case the safest approach is to ignore it, it is possible that the queue grew so much to + // exceed the garbage collection time (the residual reap time that is in the message is not being + // updated, to avoid inserting too many messages in the queue). + // Instead the messages coming from TCP bulk sync are safe with the latest value for the garbage collection time + return false } e = &entry{ @@ -197,11 +205,17 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.Unlock() if err != nil && tEvent.Type == TableEventTypeDelete { - // If it is a delete event and we did not have a state for it, don't propagate to the application + // Again we don't know the entry but this is coming from a TCP sync so the message body is up to date. + // We had saved the state so to speed up convergence and be able to avoid accepting create events. + // Now we will rebroadcast the message if 2 conditions are met: + // 1) we had already synced this network (during the network join) + // 2) the residual reapTime is higher than 1/6 of the total reapTime. // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around - // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. - // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever - return e.reapTime > nDB.config.reapEntryInterval/6 + // most likely the cluster is already aware of it + // This also reduce the possibility that deletion of entries close to their garbage collection ends up circuling around + // forever + //logrus.Infof("exiting on delete not knowing the obj with rebroadcast:%t", network.inSync) + return network.inSync && e.reapTime > nDB.config.reapEntryInterval/6 } var op opType @@ -215,7 +229,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) - return true + return network.inSync } func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) { @@ -244,7 +258,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } - if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { + if rebroadcast := nDB.handleTableEvent(&tEvent, isBulkSync); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeTableEvent, buf) if err != nil { @@ -261,12 +275,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } + // if the queue is over the threshold, avoid distributing information coming from TCP sync + if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync { + return + } + n.tableBroadcasts.QueueBroadcast(&tableEventMessage{ msg: buf, id: tEvent.NetworkID, tname: tEvent.TableName, key: tEvent.Key, - node: tEvent.NodeName, }) } } diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index c433913a46..a5512e6320 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -131,6 +131,9 @@ type network struct { // Lamport time for the latest state of the entry. ltime serf.LamportTime + // Gets set to true after the first bulk sync happens + inSync bool + // Node leave is in progress. leaving bool @@ -619,6 +622,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { } nDB.addNetworkNode(nid, nDB.config.NodeID) networkNodes := nDB.networkNodes[nid] + n = nodeNetworks[nid] nDB.Unlock() if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil { @@ -630,6 +634,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } + // Mark the network as being synced + // note this is a best effort, we are not checking the result of the bulk sync + nDB.Lock() + n.inSync = true + nDB.Unlock() + return nil }