From 6c84341094099d5160ea501473f4b6bcc6b8a18c Mon Sep 17 00:00:00 2001 From: Azzurriii Date: Wed, 30 Jul 2025 20:11:38 +0700 Subject: [PATCH 1/5] Fix error message when signing with duplicate tx id --- pkg/event/types.go | 2 ++ pkg/eventconsumer/event_consumer.go | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/pkg/event/types.go b/pkg/event/types.go index 0344bf6..36856f3 100644 --- a/pkg/event/types.go +++ b/pkg/event/types.go @@ -137,6 +137,8 @@ func GetErrorCodeFromError(err error) ErrorCode { return ErrorCodeContextCancelled case contains(errStr, "invalid signature from initiator"): return ErrorCodeInvalidInitiatorSignature + case contains(errStr, "duplicate"): + return ErrorCodeSessionDuplicate default: return ErrorCodeUnknown } diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 855c16c..db0c707 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -323,6 +323,15 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { // Check for duplicate session and track if new if ec.checkDuplicateSession(msg.WalletID, msg.TxID) { + duplicateErr := fmt.Errorf("duplicate signing request detected for walletID=%s txID=%s", msg.WalletID, msg.TxID) + ec.handleSigningSessionError( + msg.WalletID, + msg.TxID, + msg.NetworkInternalCode, + duplicateErr, + "Duplicate signing request detected", + natMsg, + ) return } From 4e27bea480ac62c2c6ee25f034c98bef564086fb Mon Sep 17 00:00:00 2001 From: Azzurriii Date: Wed, 30 Jul 2025 21:36:24 +0700 Subject: [PATCH 2/5] Update error message for duplicate signing requests to be more concise --- pkg/eventconsumer/event_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index db0c707..6f81026 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -329,7 +329,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { msg.TxID, msg.NetworkInternalCode, duplicateErr, - "Duplicate signing request detected", + "Duplicate session", natMsg, ) return From ab87ec1edfd3e7580a2c9a8641b2b75f7888ef4b Mon Sep 17 00:00:00 2001 From: Azzurriii Date: Sun, 3 Aug 2025 19:01:27 +0700 Subject: [PATCH 3/5] Enhance IdempontentKey pattern for event consumer to include SessionID --- pkg/eventconsumer/event_consumer.go | 13 ++++++++++++- pkg/mpc/ecdsa_signing_session.go | 4 +++- pkg/mpc/eddsa_signing_session.go | 4 +++- pkg/mpc/node.go | 3 +++ pkg/mpc/session.go | 5 +++-- 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 6f81026..7847792 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -336,6 +336,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { } var session mpc.SigningSession + idempotentKey := composeSigningIdempotentKey(msg.TxID, natMsg) switch msg.KeyType { case types.KeyTypeSecp256k1: session, err = ec.node.CreateSigningSession( @@ -344,6 +345,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { msg.TxID, msg.NetworkInternalCode, ec.signingResultQueue, + idempotentKey, ) case types.KeyTypeEd25519: session, err = ec.node.CreateSigningSession( @@ -352,6 +354,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { msg.TxID, msg.NetworkInternalCode, ec.signingResultQueue, + idempotentKey, ) } @@ -477,7 +480,7 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern return } err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{ - IdempotententKey: txID, + IdempotententKey: composeSigningIdempotentKey(txID, natMsg), }) if err != nil { logger.Error("Failed to enqueue signing result event", err, @@ -809,3 +812,11 @@ func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { } return fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, uniqueKey) } + +func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string { + sid := natMsg.Header.Get("SessionID") + if sid != "" { + return fmt.Sprintf("%s:%s", txID, sid) + } + return txID +} diff --git a/pkg/mpc/ecdsa_signing_session.go b/pkg/mpc/ecdsa_signing_session.go index 76cba2c..ba45ce3 100644 --- a/pkg/mpc/ecdsa_signing_session.go +++ b/pkg/mpc/ecdsa_signing_session.go @@ -52,6 +52,7 @@ func newECDSASigningSession( keyinfoStore keyinfo.Store, resultQueue messaging.MessageQueue, identityStore identity.Store, + idempotentKey string, ) *ecdsaSigningSession { return &ecdsaSigningSession{ session: session{ @@ -81,6 +82,7 @@ func newECDSASigningSession( getRoundFunc: GetEcdsaMsgRound, resultQueue: resultQueue, identityStore: identityStore, + idempotentKey: idempotentKey, }, endCh: make(chan *common.SignatureData), txID: txID, @@ -178,7 +180,7 @@ func (s *ecdsaSigningSession) Sign(onSuccess func(data []byte)) { } err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.txID, + IdempotententKey: s.idempotentKey, }) if err != nil { s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") diff --git a/pkg/mpc/eddsa_signing_session.go b/pkg/mpc/eddsa_signing_session.go index 4538730..fe037b8 100644 --- a/pkg/mpc/eddsa_signing_session.go +++ b/pkg/mpc/eddsa_signing_session.go @@ -43,6 +43,7 @@ func newEDDSASigningSession( keyinfoStore keyinfo.Store, resultQueue messaging.MessageQueue, identityStore identity.Store, + idempotentKey string, ) *eddsaSigningSession { return &eddsaSigningSession{ session: session{ @@ -72,6 +73,7 @@ func newEDDSASigningSession( getRoundFunc: GetEddsaMsgRound, resultQueue: resultQueue, identityStore: identityStore, + idempotentKey: idempotentKey, }, endCh: make(chan *common.SignatureData), txID: txID, @@ -167,7 +169,7 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) { } err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.txID, + IdempotententKey: s.idempotentKey, }) if err != nil { s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") diff --git a/pkg/mpc/node.go b/pkg/mpc/node.go index 4daf65d..22340a4 100644 --- a/pkg/mpc/node.go +++ b/pkg/mpc/node.go @@ -165,6 +165,7 @@ func (p *Node) CreateSigningSession( txID string, networkInternalCode string, resultQueue messaging.MessageQueue, + idempotentKey string, ) (SigningSession, error) { version := p.getVersion(sessionType, walletID) keyInfo, err := p.getKeyInfo(sessionType, walletID) @@ -211,6 +212,7 @@ func (p *Node) CreateSigningSession( p.keyinfoStore, resultQueue, p.identityStore, + idempotentKey, ), nil case SessionTypeEDDSA: @@ -228,6 +230,7 @@ func (p *Node) CreateSigningSession( p.keyinfoStore, resultQueue, p.identityStore, + idempotentKey, ), nil } diff --git a/pkg/mpc/session.go b/pkg/mpc/session.go index ece9c18..645ecbe 100644 --- a/pkg/mpc/session.go +++ b/pkg/mpc/session.go @@ -72,8 +72,9 @@ type session struct { getRoundFunc GetRoundFunc mu sync.Mutex // After the session is done, the key will be stored pubkeyBytes - pubkeyBytes []byte - sessionType SessionType + pubkeyBytes []byte + sessionType SessionType + idempotentKey string } func (s *session) PartyID() *tss.PartyID { From 058a95c7db2ea01b1364e2b15dea13e2848b2586 Mon Sep 17 00:00:00 2001 From: Azzurriii Date: Sun, 3 Aug 2025 20:26:34 +0700 Subject: [PATCH 4/5] Update composeSigningIdempotentKey to match key gen pattern --- pkg/eventconsumer/event_consumer.go | 7 +++++-- pkg/mpc/session.go | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 7847792..5d49203 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -814,9 +814,12 @@ func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { } func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string { + var uniqueKey string sid := natMsg.Header.Get("SessionID") if sid != "" { - return fmt.Sprintf("%s:%s", txID, sid) + uniqueKey = fmt.Sprintf("%s:%s", txID, sid) + } else { + uniqueKey = txID } - return txID + return fmt.Sprintf(mpc.TypeSigningResultFmt, uniqueKey) } diff --git a/pkg/mpc/session.go b/pkg/mpc/session.go index 645ecbe..8326b40 100644 --- a/pkg/mpc/session.go +++ b/pkg/mpc/session.go @@ -22,6 +22,7 @@ type SessionType string const ( TypeGenerateWalletResultFmt = "mpc.mpc_keygen_result.%s" TypeReshareWalletResultFmt = "mpc.mpc_reshare_result.%s" + TypeSigningResultFmt = "mpc.mpc_signing_result.%s" SessionTypeECDSA SessionType = "session_ecdsa" SessionTypeEDDSA SessionType = "session_eddsa" From 62d3441bb14ae4cb70738cbe1901caae19d68804 Mon Sep 17 00:00:00 2001 From: Azzurriii Date: Sun, 3 Aug 2025 22:26:53 +0700 Subject: [PATCH 5/5] Enhance compose key gen function to get better structure --- pkg/eventconsumer/event_consumer.go | 50 ++++++++++++++++------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 5d49203..b387057 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -512,7 +512,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { var msg types.ResharingMessage if err := json.Unmarshal(natMsg.Data, &msg); err != nil { logger.Error("Failed to unmarshal resharing message", err) - ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message") + ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message", natMsg) return } @@ -523,13 +523,14 @@ func (ec *eventConsumer) consumeReshareEvent() error { msg.NewThreshold, errors.New("validation: session ID is empty"), "Session ID is empty", + natMsg, ) return } if err := ec.identityStore.VerifyInitiatorMessage(&msg); err != nil { logger.Error("Failed to verify initiator message", err) - ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message") + ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message", natMsg) return } @@ -539,7 +540,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { sessionType, err := sessionTypeFromKeyType(keyType) if err != nil { logger.Error("Failed to get session type", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type", natMsg) return } @@ -558,13 +559,13 @@ func (ec *eventConsumer) consumeReshareEvent() error { oldSession, err := createSession(false) if err != nil { logger.Error("Failed to create old reshare session", err, "walletID", walletID) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session", natMsg) return } newSession, err := createSession(true) if err != nil { logger.Error("Failed to create new reshare session", err, "walletID", walletID) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session", natMsg) return } @@ -600,7 +601,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { return case err := <-oldSession.ErrChan(): logger.Error("Old reshare session error", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error", natMsg) doneOld() // Cancel the context to stop this session return } @@ -624,7 +625,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { return case err := <-newSession.ErrChan(): logger.Error("New reshare session error", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error", natMsg) doneNew() // Cancel the context to stop this session return } @@ -640,7 +641,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { successBytes, err := json.Marshal(successEvent) if err != nil { logger.Error("Failed to marshal reshare success event", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event", natMsg) return } @@ -649,11 +650,11 @@ func (ec *eventConsumer) consumeReshareEvent() error { key, successBytes, &messaging.EnqueueOptions{ - IdempotententKey: key, + IdempotententKey: composeReshareIdempotentKey(msg.SessionID, natMsg), }) if err != nil { logger.Error("Failed to publish reshare success message", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message", natMsg) return } logger.Info("[COMPLETED RESHARE] Successfully published", "walletID", walletID) @@ -673,6 +674,7 @@ func (ec *eventConsumer) handleReshareSessionError( newThreshold int, err error, contextMsg string, + natMsg *nats.Msg, ) { fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err) errorCode := event.GetErrorCodeFromError(err) @@ -705,7 +707,7 @@ func (ec *eventConsumer) handleReshareSessionError( key := fmt.Sprintf(mpc.TypeReshareWalletResultFmt, walletID) err = ec.reshareResultQueue.Enqueue(key, reshareResultBytes, &messaging.EnqueueOptions{ - IdempotententKey: key, + IdempotententKey: composeReshareIdempotentKey(walletID, natMsg), }) if err != nil { logger.Error("Failed to enqueue reshare result event", err, @@ -802,24 +804,26 @@ func sessionTypeFromKeyType(keyType types.KeyType) (mpc.SessionType, error) { } } -func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { +// composeIdempotentKey creates an idempotent key for different MPC operation types +func composeIdempotentKey(baseID string, natMsg *nats.Msg, formatTemplate string) string { var uniqueKey string sid := natMsg.Header.Get("SessionID") if sid != "" { - uniqueKey = fmt.Sprintf("%s:%s", walletID, sid) + uniqueKey = fmt.Sprintf("%s:%s", baseID, sid) } else { - uniqueKey = walletID + uniqueKey = baseID } - return fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, uniqueKey) + return fmt.Sprintf(formatTemplate, uniqueKey) +} + +func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { + return composeIdempotentKey(walletID, natMsg, mpc.TypeGenerateWalletResultFmt) } func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string { - var uniqueKey string - sid := natMsg.Header.Get("SessionID") - if sid != "" { - uniqueKey = fmt.Sprintf("%s:%s", txID, sid) - } else { - uniqueKey = txID - } - return fmt.Sprintf(mpc.TypeSigningResultFmt, uniqueKey) + return composeIdempotentKey(txID, natMsg, mpc.TypeSigningResultFmt) +} + +func composeReshareIdempotentKey(sessionID string, natMsg *nats.Msg) string { + return composeIdempotentKey(sessionID, natMsg, mpc.TypeReshareWalletResultFmt) }