From e79f6e20fbe225e5aad8b0c9e70578356fce9573 Mon Sep 17 00:00:00 2001 From: anhthii Date: Wed, 2 Apr 2025 19:28:28 +0700 Subject: [PATCH 1/2] Handle duplicate message --- cmd/main.go | 5 +- pkg/eventconsumer/consumer.go | 96 ++++++++++++++++++++++++++++++++++- pkg/messaging/pubsub.go | 1 + pkg/mpc/node.go | 1 - 4 files changed, 100 insertions(+), 3 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 6690ad2..e6088ff 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -105,8 +105,11 @@ func main() { timeoutConsumer.Run() defer timeoutConsumer.Close() - signingCounsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub) + + // Make the node ready before starting the signing consumer + peerRegistry.Ready() + signingCounsumer.Run() defer signingCounsumer.Close() diff --git a/pkg/eventconsumer/consumer.go b/pkg/eventconsumer/consumer.go index a25e86d..a925fe7 100644 --- a/pkg/eventconsumer/consumer.go +++ b/pkg/eventconsumer/consumer.go @@ -35,6 +35,13 @@ type eventConsumer struct { keyGenerationSub messaging.Subscription signingSub messaging.Subscription + + // Track active sessions with timestamps for cleanup + activeSessions map[string]time.Time // Maps "walletID-txID" to creation time + sessionsLock sync.RWMutex + cleanupInterval time.Duration // How often to run cleanup + sessionTimeout time.Duration // How long before a session is considered stale + cleanupStopChan chan struct{} // Signal to stop cleanup goroutine } func NewEventConsumer( @@ -43,12 +50,21 @@ func NewEventConsumer( genKeySucecssQueue messaging.MessageQueue, signingResultQueue messaging.MessageQueue, ) EventConsumer { - return &eventConsumer{ + ec := &eventConsumer{ node: node, pubsub: pubsub, genKeySucecssQueue: genKeySucecssQueue, signingResultQueue: signingResultQueue, + activeSessions: make(map[string]time.Time), + cleanupInterval: 5 * time.Minute, // Run cleanup every 5 minutes + sessionTimeout: 30 * time.Minute, // Consider sessions older than 30 minutes stale + cleanupStopChan: make(chan struct{}), } + + // Start background cleanup goroutine + go ec.sessionCleanupRoutine() + + return ec } func (ec *eventConsumer) Run() { @@ -175,6 +191,11 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { logger.Info("Received signing event", "waleltID", msg.WalletID, "type", msg.KeyType, "tx", msg.TxID) threshold := 1 + // Check for duplicate session and track if new + if ec.checkDuplicateSession(msg.WalletID, msg.TxID) { + return + } + var session mpc.ISigningSession switch msg.KeyType { case KeyTypeSecp256k1: @@ -196,6 +217,8 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { } + ec.addSession(msg.WalletID, msg.TxID) + if err != nil { ec.handleSigningSessionError(msg.WalletID, msg.TxID, msg.NetworkInternalCode, err, "Failed to create signing session", natMsg) return @@ -271,8 +294,79 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, NetworkIntern } } +// Add a cleanup routine that runs periodically +func (ec *eventConsumer) sessionCleanupRoutine() { + ticker := time.NewTicker(ec.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ec.cleanupStaleSessions() + case <-ec.cleanupStopChan: + return + } + } +} + +// Cleanup stale sessions +func (ec *eventConsumer) cleanupStaleSessions() { + now := time.Now() + ec.sessionsLock.Lock() + defer ec.sessionsLock.Unlock() + + for sessionID, creationTime := range ec.activeSessions { + if now.Sub(creationTime) > ec.sessionTimeout { + logger.Info("Cleaning up stale session", "sessionID", sessionID, "age", now.Sub(creationTime)) + delete(ec.activeSessions, sessionID) + } + } +} + +// markSessionAsActive marks a session as active with the current timestamp +func (ec *eventConsumer) addSession(walletID, txID string) { + sessionID := fmt.Sprintf("%s-%s", walletID, txID) + ec.sessionsLock.Lock() + ec.activeSessions[sessionID] = time.Now() + ec.sessionsLock.Unlock() +} + +// Remove a session from tracking +func (ec *eventConsumer) removeSession(walletID, txID string) { + sessionID := fmt.Sprintf("%s-%s", walletID, txID) + ec.sessionsLock.Lock() + delete(ec.activeSessions, sessionID) + ec.sessionsLock.Unlock() +} + +// checkAndTrackSession checks if a session already exists and tracks it if new. +// Returns true if the session is a duplicate. +func (ec *eventConsumer) checkDuplicateSession(walletID, txID string) bool { + sessionID := fmt.Sprintf("%s-%s", walletID, txID) + + // Check for duplicate + ec.sessionsLock.RLock() + _, isDuplicate := ec.activeSessions[sessionID] + ec.sessionsLock.RUnlock() + + if isDuplicate { + logger.Info("Duplicate signing request detected", "walletID", walletID, "txID", txID) + return true + } + + // Mark as active + ec.sessionsLock.Lock() + ec.activeSessions[sessionID] = time.Now() + ec.sessionsLock.Unlock() + + return false +} + // Close and clean up func (ec *eventConsumer) Close() error { + // Signal cleanup routine to stop + close(ec.cleanupStopChan) + err := ec.keyGenerationSub.Unsubscribe() if err != nil { return err diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 9f3a9c7..f8fb221 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -229,6 +229,7 @@ func (j *jetStreamPubSub) Subscribe(name string, topic string, handler func(msg BackOff: []time.Duration{60 * time.Second, 60 * time.Second, 60 * time.Second}, DeliverPolicy: jetstream.DeliverAllPolicy, // Deliver all messages FilterSubject: topic, + AckWait: 30 * time.Second, // explicitly set ack wait here } logger.Info("Creating consumer", "config", consumerConfig, "stream", j.config.streamName) diff --git a/pkg/mpc/node.go b/pkg/mpc/node.go index 10a647d..e904185 100644 --- a/pkg/mpc/node.go +++ b/pkg/mpc/node.go @@ -68,7 +68,6 @@ func NewNode( } logger.Info("Starting new node, preparams is generated successfully!") - peerRegistry.Ready() go peerRegistry.WatchPeersReady() return &Node{ From a9192ca11581dd986bdd21728cbda4b78d75a753 Mon Sep 17 00:00:00 2001 From: anhthii Date: Wed, 2 Apr 2025 21:21:32 +0700 Subject: [PATCH 2/2] Fix bug signing doesn't work after all nodes are backup --- pkg/eventconsumer/consumer.go | 29 ++++++++++++++--------------- pkg/logger/logger.go | 2 +- pkg/messaging/pubsub.go | 2 +- pkg/mpc/ecdsa_signing_session.go | 3 +-- pkg/mpc/eddsa_signing_session.go | 3 ++- pkg/mpc/session.go | 4 ++++ 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/eventconsumer/consumer.go b/pkg/eventconsumer/consumer.go index a925fe7..b1de244 100644 --- a/pkg/eventconsumer/consumer.go +++ b/pkg/eventconsumer/consumer.go @@ -3,6 +3,7 @@ package eventconsumer import ( "context" "encoding/json" + "errors" "fmt" "log" "math/big" @@ -145,9 +146,6 @@ func (ec *eventConsumer) consumeKeyGenerationEvent() error { go eddsaSession.GenerateKey(doneEddsa) wg.Wait() - if err != nil { - logger.Error("Errors when closing sessions", err) - } logger.Info("Closing section successfully!", "event", successEvent) successEventBytes, err := json.Marshal(successEvent) @@ -165,9 +163,6 @@ func (ec *eventConsumer) consumeKeyGenerationEvent() error { } logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID) - if err != nil { - logger.Error("Failed to close session", err) - } }) @@ -193,6 +188,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { // Check for duplicate session and track if new if ec.checkDuplicateSession(msg.WalletID, msg.TxID) { + natMsg.Term() return } @@ -217,8 +213,6 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { } - ec.addSession(msg.WalletID, msg.TxID) - if err != nil { ec.handleSigningSessionError(msg.WalletID, msg.TxID, msg.NetworkInternalCode, err, "Failed to create signing session", natMsg) return @@ -227,7 +221,8 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { txBigInt := new(big.Int).SetBytes(msg.Tx) err = session.Init(txBigInt) if err != nil { - if err.Error() == "Not enough participants to sign" { + if errors.Is(err, mpc.ErrNotEnoughParticipants) { + logger.Info("RETRY LATER: Not enough participants to sign") //Return for retry later return } @@ -235,6 +230,9 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { return } + // Mark session as already processed + ec.addSession(msg.WalletID, msg.TxID) + ctx, done := context.WithCancel(context.Background()) go func() { for { @@ -251,7 +249,13 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { }() session.ListenToIncomingMessageAsync() - // TODO: use consul distributed lock here + // TODO: use consul distributed lock here, only sign after all nodes has already completed listing to incoming message async + // The purpose of the sleep is to be ensuring that the node has properly set up its message listeners + // before it starts the signing process. If the signing process starts sending messages before other nodes + // have set up their listeners, those messages might be missed, potentially causing the signing process to fail. + // One solution: + // The messaging includes mechanisms for direct point-to-point communication (in point2point.go). + // The nodes could explicitly coordinate through request-response patterns before starting signing time.Sleep(1 * time.Second) go session.Sign(done, natMsg) // use go routine to not block the event susbscriber }) @@ -354,11 +358,6 @@ func (ec *eventConsumer) checkDuplicateSession(walletID, txID string) bool { return true } - // Mark as active - ec.sessionsLock.Lock() - ec.activeSessions[sessionID] = time.Now() - ec.sessionsLock.Unlock() - return false } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index dd42bb6..543c314 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -12,7 +12,7 @@ var Log zerolog.Logger func Init(env string) { zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack if env != "production" { - Log = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: false}).With().Timestamp().Logger() + Log = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: false, TimeFormat: "2006-01-02 15:04:05.000"}).With().Timestamp().Logger() } else { Log = zerolog.New(os.Stdout).With().Timestamp().Logger() } diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index f8fb221..ab4f4d0 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -226,7 +226,7 @@ func (j *jetStreamPubSub) Subscribe(name string, topic string, handler func(msg Durable: sanitizeConsumerName(name), AckPolicy: jetstream.AckExplicitPolicy, MaxDeliver: 3, - BackOff: []time.Duration{60 * time.Second, 60 * time.Second, 60 * time.Second}, + BackOff: []time.Duration{30 * time.Second, 30 * time.Second, 30 * time.Second}, DeliverPolicy: jetstream.DeliverAllPolicy, // Deliver all messages FilterSubject: topic, AckWait: 30 * time.Second, // explicitly set ack wait here diff --git a/pkg/mpc/ecdsa_signing_session.go b/pkg/mpc/ecdsa_signing_session.go index 76c95e9..051a3a8 100644 --- a/pkg/mpc/ecdsa_signing_session.go +++ b/pkg/mpc/ecdsa_signing_session.go @@ -107,9 +107,8 @@ func (s *SigningSession) Init(tx *big.Int) error { } if len(s.participantPeerIDs) < keyInfo.Threshold+1 { - err := fmt.Errorf("Not enough participants to sign") logger.Warn("Not enough participants to sign", "participants", s.participantPeerIDs, "expected", keyInfo.Threshold+1) - return err + return ErrNotEnoughParticipants } // check if t+1 participants are present diff --git a/pkg/mpc/eddsa_signing_session.go b/pkg/mpc/eddsa_signing_session.go index f7cbd01..efb6449 100644 --- a/pkg/mpc/eddsa_signing_session.go +++ b/pkg/mpc/eddsa_signing_session.go @@ -93,7 +93,8 @@ func (s *EDDSASigningSession) Init(tx *big.Int) error { } if len(s.participantPeerIDs) < keyInfo.Threshold+1 { - return fmt.Errorf("Not enough participants to sign, expected %d, got %d", keyInfo.Threshold+1, len(s.participantPeerIDs)) + logger.Warn("Not enough participants to sign, expected %d, got %d", keyInfo.Threshold+1, len(s.participantPeerIDs)) + return ErrNotEnoughParticipants } // check if t+1 participants are present diff --git a/pkg/mpc/session.go b/pkg/mpc/session.go index 34fdfa6..7a62e34 100644 --- a/pkg/mpc/session.go +++ b/pkg/mpc/session.go @@ -15,6 +15,10 @@ import ( "github.com/nats-io/nats.go" ) +var ( + ErrNotEnoughParticipants = errors.New("Not enough participants to sign") +) + type TopicComposer struct { ComposeBroadcastTopic func() string ComposeDirectTopic func(nodeID string) string