Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ func main() {

timeoutConsumer.Run()
defer timeoutConsumer.Close()

signingCounsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub)

// Make the node ready before starting the signing consumer
peerRegistry.Ready()

signingCounsumer.Run()
defer signingCounsumer.Close()

Expand Down
111 changes: 102 additions & 9 deletions pkg/eventconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eventconsumer
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"math/big"
Expand Down Expand Up @@ -35,6 +36,13 @@ type eventConsumer struct {

keyGenerationSub messaging.Subscription
signingSub messaging.Subscription

// Track active sessions with timestamps for cleanup
activeSessions map[string]time.Time // Maps "walletID-txID" to creation time
sessionsLock sync.RWMutex
cleanupInterval time.Duration // How often to run cleanup
sessionTimeout time.Duration // How long before a session is considered stale
cleanupStopChan chan struct{} // Signal to stop cleanup goroutine
}

func NewEventConsumer(
Expand All @@ -43,12 +51,21 @@ func NewEventConsumer(
genKeySucecssQueue messaging.MessageQueue,
signingResultQueue messaging.MessageQueue,
) EventConsumer {
return &eventConsumer{
ec := &eventConsumer{
node: node,
pubsub: pubsub,
genKeySucecssQueue: genKeySucecssQueue,
signingResultQueue: signingResultQueue,
activeSessions: make(map[string]time.Time),
cleanupInterval: 5 * time.Minute, // Run cleanup every 5 minutes
sessionTimeout: 30 * time.Minute, // Consider sessions older than 30 minutes stale
cleanupStopChan: make(chan struct{}),
}

// Start background cleanup goroutine
go ec.sessionCleanupRoutine()

return ec
}

func (ec *eventConsumer) Run() {
Expand Down Expand Up @@ -129,9 +146,6 @@ func (ec *eventConsumer) consumeKeyGenerationEvent() error {
go eddsaSession.GenerateKey(doneEddsa)

wg.Wait()
if err != nil {
logger.Error("Errors when closing sessions", err)
}
logger.Info("Closing section successfully!", "event", successEvent)

successEventBytes, err := json.Marshal(successEvent)
Expand All @@ -149,9 +163,6 @@ func (ec *eventConsumer) consumeKeyGenerationEvent() error {
}

logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID)
if err != nil {
logger.Error("Failed to close session", err)
}

})

Expand All @@ -175,6 +186,12 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
logger.Info("Received signing event", "waleltID", msg.WalletID, "type", msg.KeyType, "tx", msg.TxID)
threshold := 1

// Check for duplicate session and track if new
if ec.checkDuplicateSession(msg.WalletID, msg.TxID) {
natMsg.Term()
return
}

var session mpc.ISigningSession
switch msg.KeyType {
case KeyTypeSecp256k1:
Expand Down Expand Up @@ -204,14 +221,18 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
txBigInt := new(big.Int).SetBytes(msg.Tx)
err = session.Init(txBigInt)
if err != nil {
if err.Error() == "Not enough participants to sign" {
if errors.Is(err, mpc.ErrNotEnoughParticipants) {
logger.Info("RETRY LATER: Not enough participants to sign")
//Return for retry later
return
}
ec.handleSigningSessionError(msg.WalletID, msg.TxID, msg.NetworkInternalCode, err, "Failed to init signing session", natMsg)
return
}

// Mark session as already processed
ec.addSession(msg.WalletID, msg.TxID)

ctx, done := context.WithCancel(context.Background())
go func() {
for {
Expand All @@ -228,7 +249,13 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
}()

session.ListenToIncomingMessageAsync()
// TODO: use consul distributed lock here
// TODO: use consul distributed lock here, only sign after all nodes has already completed listing to incoming message async
// The purpose of the sleep is to be ensuring that the node has properly set up its message listeners
// before it starts the signing process. If the signing process starts sending messages before other nodes
// have set up their listeners, those messages might be missed, potentially causing the signing process to fail.
// One solution:
// The messaging includes mechanisms for direct point-to-point communication (in point2point.go).
// The nodes could explicitly coordinate through request-response patterns before starting signing
time.Sleep(1 * time.Second)
go session.Sign(done, natMsg) // use go routine to not block the event susbscriber
})
Expand Down Expand Up @@ -271,8 +298,74 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, NetworkIntern
}
}

// Add a cleanup routine that runs periodically
func (ec *eventConsumer) sessionCleanupRoutine() {
ticker := time.NewTicker(ec.cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ec.cleanupStaleSessions()
case <-ec.cleanupStopChan:
return
}
}
}

// Cleanup stale sessions
func (ec *eventConsumer) cleanupStaleSessions() {
now := time.Now()
ec.sessionsLock.Lock()
defer ec.sessionsLock.Unlock()

for sessionID, creationTime := range ec.activeSessions {
if now.Sub(creationTime) > ec.sessionTimeout {
logger.Info("Cleaning up stale session", "sessionID", sessionID, "age", now.Sub(creationTime))
delete(ec.activeSessions, sessionID)
}
}
}

// markSessionAsActive marks a session as active with the current timestamp
func (ec *eventConsumer) addSession(walletID, txID string) {
sessionID := fmt.Sprintf("%s-%s", walletID, txID)
ec.sessionsLock.Lock()
ec.activeSessions[sessionID] = time.Now()
ec.sessionsLock.Unlock()
}

// Remove a session from tracking
func (ec *eventConsumer) removeSession(walletID, txID string) {
sessionID := fmt.Sprintf("%s-%s", walletID, txID)
ec.sessionsLock.Lock()
delete(ec.activeSessions, sessionID)
ec.sessionsLock.Unlock()
}

// checkAndTrackSession checks if a session already exists and tracks it if new.
// Returns true if the session is a duplicate.
func (ec *eventConsumer) checkDuplicateSession(walletID, txID string) bool {
sessionID := fmt.Sprintf("%s-%s", walletID, txID)

// Check for duplicate
ec.sessionsLock.RLock()
_, isDuplicate := ec.activeSessions[sessionID]
ec.sessionsLock.RUnlock()

if isDuplicate {
logger.Info("Duplicate signing request detected", "walletID", walletID, "txID", txID)
return true
}

return false
}

// Close and clean up
func (ec *eventConsumer) Close() error {
// Signal cleanup routine to stop
close(ec.cleanupStopChan)

err := ec.keyGenerationSub.Unsubscribe()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var Log zerolog.Logger
func Init(env string) {
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack
if env != "production" {
Log = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: false}).With().Timestamp().Logger()
Log = zerolog.New(zerolog.ConsoleWriter{Out: os.Stderr, NoColor: false, TimeFormat: "2006-01-02 15:04:05.000"}).With().Timestamp().Logger()
} else {
Log = zerolog.New(os.Stdout).With().Timestamp().Logger()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/messaging/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,10 @@ func (j *jetStreamPubSub) Subscribe(name string, topic string, handler func(msg
Durable: sanitizeConsumerName(name),
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 3,
BackOff: []time.Duration{60 * time.Second, 60 * time.Second, 60 * time.Second},
BackOff: []time.Duration{30 * time.Second, 30 * time.Second, 30 * time.Second},
DeliverPolicy: jetstream.DeliverAllPolicy, // Deliver all messages
FilterSubject: topic,
AckWait: 30 * time.Second, // explicitly set ack wait here
}

logger.Info("Creating consumer", "config", consumerConfig, "stream", j.config.streamName)
Expand Down
3 changes: 1 addition & 2 deletions pkg/mpc/ecdsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ func (s *SigningSession) Init(tx *big.Int) error {
}

if len(s.participantPeerIDs) < keyInfo.Threshold+1 {
err := fmt.Errorf("Not enough participants to sign")
logger.Warn("Not enough participants to sign", "participants", s.participantPeerIDs, "expected", keyInfo.Threshold+1)
return err
return ErrNotEnoughParticipants
}

// check if t+1 participants are present
Expand Down
3 changes: 2 additions & 1 deletion pkg/mpc/eddsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (s *EDDSASigningSession) Init(tx *big.Int) error {
}

if len(s.participantPeerIDs) < keyInfo.Threshold+1 {
return fmt.Errorf("Not enough participants to sign, expected %d, got %d", keyInfo.Threshold+1, len(s.participantPeerIDs))
logger.Warn("Not enough participants to sign, expected %d, got %d", keyInfo.Threshold+1, len(s.participantPeerIDs))
return ErrNotEnoughParticipants
}

// check if t+1 participants are present
Expand Down
1 change: 0 additions & 1 deletion pkg/mpc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func NewNode(
}
logger.Info("Starting new node, preparams is generated successfully!")

peerRegistry.Ready()
go peerRegistry.WatchPeersReady()

return &Node{
Expand Down
4 changes: 4 additions & 0 deletions pkg/mpc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"github.com/nats-io/nats.go"
)

var (
ErrNotEnoughParticipants = errors.New("Not enough participants to sign")
)

type TopicComposer struct {
ComposeBroadcastTopic func() string
ComposeDirectTopic func(nodeID string) string
Expand Down