diff --git a/pkg/eventconsumer/event_consumer.go b/pkg/eventconsumer/event_consumer.go index 6f81026..b387057 100644 --- a/pkg/eventconsumer/event_consumer.go +++ b/pkg/eventconsumer/event_consumer.go @@ -336,6 +336,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { } var session mpc.SigningSession + idempotentKey := composeSigningIdempotentKey(msg.TxID, natMsg) switch msg.KeyType { case types.KeyTypeSecp256k1: session, err = ec.node.CreateSigningSession( @@ -344,6 +345,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { msg.TxID, msg.NetworkInternalCode, ec.signingResultQueue, + idempotentKey, ) case types.KeyTypeEd25519: session, err = ec.node.CreateSigningSession( @@ -352,6 +354,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error { msg.TxID, msg.NetworkInternalCode, ec.signingResultQueue, + idempotentKey, ) } @@ -477,7 +480,7 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern return } err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{ - IdempotententKey: txID, + IdempotententKey: composeSigningIdempotentKey(txID, natMsg), }) if err != nil { logger.Error("Failed to enqueue signing result event", err, @@ -509,7 +512,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { var msg types.ResharingMessage if err := json.Unmarshal(natMsg.Data, &msg); err != nil { logger.Error("Failed to unmarshal resharing message", err) - ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message") + ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message", natMsg) return } @@ -520,13 +523,14 @@ func (ec *eventConsumer) consumeReshareEvent() error { msg.NewThreshold, errors.New("validation: session ID is empty"), "Session ID is empty", + natMsg, ) return } if err := ec.identityStore.VerifyInitiatorMessage(&msg); err != nil { logger.Error("Failed to verify initiator message", err) - ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message") + ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message", natMsg) return } @@ -536,7 +540,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { sessionType, err := sessionTypeFromKeyType(keyType) if err != nil { logger.Error("Failed to get session type", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type", natMsg) return } @@ -555,13 +559,13 @@ func (ec *eventConsumer) consumeReshareEvent() error { oldSession, err := createSession(false) if err != nil { logger.Error("Failed to create old reshare session", err, "walletID", walletID) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session", natMsg) return } newSession, err := createSession(true) if err != nil { logger.Error("Failed to create new reshare session", err, "walletID", walletID) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session", natMsg) return } @@ -597,7 +601,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { return case err := <-oldSession.ErrChan(): logger.Error("Old reshare session error", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error", natMsg) doneOld() // Cancel the context to stop this session return } @@ -621,7 +625,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { return case err := <-newSession.ErrChan(): logger.Error("New reshare session error", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error", natMsg) doneNew() // Cancel the context to stop this session return } @@ -637,7 +641,7 @@ func (ec *eventConsumer) consumeReshareEvent() error { successBytes, err := json.Marshal(successEvent) if err != nil { logger.Error("Failed to marshal reshare success event", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event", natMsg) return } @@ -646,11 +650,11 @@ func (ec *eventConsumer) consumeReshareEvent() error { key, successBytes, &messaging.EnqueueOptions{ - IdempotententKey: key, + IdempotententKey: composeReshareIdempotentKey(msg.SessionID, natMsg), }) if err != nil { logger.Error("Failed to publish reshare success message", err) - ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message") + ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message", natMsg) return } logger.Info("[COMPLETED RESHARE] Successfully published", "walletID", walletID) @@ -670,6 +674,7 @@ func (ec *eventConsumer) handleReshareSessionError( newThreshold int, err error, contextMsg string, + natMsg *nats.Msg, ) { fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err) errorCode := event.GetErrorCodeFromError(err) @@ -702,7 +707,7 @@ func (ec *eventConsumer) handleReshareSessionError( key := fmt.Sprintf(mpc.TypeReshareWalletResultFmt, walletID) err = ec.reshareResultQueue.Enqueue(key, reshareResultBytes, &messaging.EnqueueOptions{ - IdempotententKey: key, + IdempotententKey: composeReshareIdempotentKey(walletID, natMsg), }) if err != nil { logger.Error("Failed to enqueue reshare result event", err, @@ -799,13 +804,26 @@ func sessionTypeFromKeyType(keyType types.KeyType) (mpc.SessionType, error) { } } -func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { +// composeIdempotentKey creates an idempotent key for different MPC operation types +func composeIdempotentKey(baseID string, natMsg *nats.Msg, formatTemplate string) string { var uniqueKey string sid := natMsg.Header.Get("SessionID") if sid != "" { - uniqueKey = fmt.Sprintf("%s:%s", walletID, sid) + uniqueKey = fmt.Sprintf("%s:%s", baseID, sid) } else { - uniqueKey = walletID + uniqueKey = baseID } - return fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, uniqueKey) + return fmt.Sprintf(formatTemplate, uniqueKey) +} + +func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string { + return composeIdempotentKey(walletID, natMsg, mpc.TypeGenerateWalletResultFmt) +} + +func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string { + return composeIdempotentKey(txID, natMsg, mpc.TypeSigningResultFmt) +} + +func composeReshareIdempotentKey(sessionID string, natMsg *nats.Msg) string { + return composeIdempotentKey(sessionID, natMsg, mpc.TypeReshareWalletResultFmt) } diff --git a/pkg/mpc/ecdsa_signing_session.go b/pkg/mpc/ecdsa_signing_session.go index 76cba2c..ba45ce3 100644 --- a/pkg/mpc/ecdsa_signing_session.go +++ b/pkg/mpc/ecdsa_signing_session.go @@ -52,6 +52,7 @@ func newECDSASigningSession( keyinfoStore keyinfo.Store, resultQueue messaging.MessageQueue, identityStore identity.Store, + idempotentKey string, ) *ecdsaSigningSession { return &ecdsaSigningSession{ session: session{ @@ -81,6 +82,7 @@ func newECDSASigningSession( getRoundFunc: GetEcdsaMsgRound, resultQueue: resultQueue, identityStore: identityStore, + idempotentKey: idempotentKey, }, endCh: make(chan *common.SignatureData), txID: txID, @@ -178,7 +180,7 @@ func (s *ecdsaSigningSession) Sign(onSuccess func(data []byte)) { } err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.txID, + IdempotententKey: s.idempotentKey, }) if err != nil { s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") diff --git a/pkg/mpc/eddsa_signing_session.go b/pkg/mpc/eddsa_signing_session.go index 4538730..fe037b8 100644 --- a/pkg/mpc/eddsa_signing_session.go +++ b/pkg/mpc/eddsa_signing_session.go @@ -43,6 +43,7 @@ func newEDDSASigningSession( keyinfoStore keyinfo.Store, resultQueue messaging.MessageQueue, identityStore identity.Store, + idempotentKey string, ) *eddsaSigningSession { return &eddsaSigningSession{ session: session{ @@ -72,6 +73,7 @@ func newEDDSASigningSession( getRoundFunc: GetEddsaMsgRound, resultQueue: resultQueue, identityStore: identityStore, + idempotentKey: idempotentKey, }, endCh: make(chan *common.SignatureData), txID: txID, @@ -167,7 +169,7 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) { } err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{ - IdempotententKey: s.txID, + IdempotententKey: s.idempotentKey, }) if err != nil { s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message") diff --git a/pkg/mpc/node.go b/pkg/mpc/node.go index 4daf65d..22340a4 100644 --- a/pkg/mpc/node.go +++ b/pkg/mpc/node.go @@ -165,6 +165,7 @@ func (p *Node) CreateSigningSession( txID string, networkInternalCode string, resultQueue messaging.MessageQueue, + idempotentKey string, ) (SigningSession, error) { version := p.getVersion(sessionType, walletID) keyInfo, err := p.getKeyInfo(sessionType, walletID) @@ -211,6 +212,7 @@ func (p *Node) CreateSigningSession( p.keyinfoStore, resultQueue, p.identityStore, + idempotentKey, ), nil case SessionTypeEDDSA: @@ -228,6 +230,7 @@ func (p *Node) CreateSigningSession( p.keyinfoStore, resultQueue, p.identityStore, + idempotentKey, ), nil } diff --git a/pkg/mpc/session.go b/pkg/mpc/session.go index ece9c18..8326b40 100644 --- a/pkg/mpc/session.go +++ b/pkg/mpc/session.go @@ -22,6 +22,7 @@ type SessionType string const ( TypeGenerateWalletResultFmt = "mpc.mpc_keygen_result.%s" TypeReshareWalletResultFmt = "mpc.mpc_reshare_result.%s" + TypeSigningResultFmt = "mpc.mpc_signing_result.%s" SessionTypeECDSA SessionType = "session_ecdsa" SessionTypeEDDSA SessionType = "session_eddsa" @@ -72,8 +73,9 @@ type session struct { getRoundFunc GetRoundFunc mu sync.Mutex // After the session is done, the key will be stored pubkeyBytes - pubkeyBytes []byte - sessionType SessionType + pubkeyBytes []byte + sessionType SessionType + idempotentKey string } func (s *session) PartyID() *tss.PartyID {