Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 34 additions & 16 deletions pkg/eventconsumer/event_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
}

var session mpc.SigningSession
idempotentKey := composeSigningIdempotentKey(msg.TxID, natMsg)
switch msg.KeyType {
case types.KeyTypeSecp256k1:
session, err = ec.node.CreateSigningSession(
Expand All @@ -344,6 +345,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
msg.TxID,
msg.NetworkInternalCode,
ec.signingResultQueue,
idempotentKey,
)
case types.KeyTypeEd25519:
session, err = ec.node.CreateSigningSession(
Expand All @@ -352,6 +354,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
msg.TxID,
msg.NetworkInternalCode,
ec.signingResultQueue,
idempotentKey,
)

}
Expand Down Expand Up @@ -477,7 +480,7 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
return
}
err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{
IdempotententKey: txID,
IdempotententKey: composeSigningIdempotentKey(txID, natMsg),
})
if err != nil {
logger.Error("Failed to enqueue signing result event", err,
Expand Down Expand Up @@ -509,7 +512,7 @@ func (ec *eventConsumer) consumeReshareEvent() error {
var msg types.ResharingMessage
if err := json.Unmarshal(natMsg.Data, &msg); err != nil {
logger.Error("Failed to unmarshal resharing message", err)
ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message")
ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to unmarshal resharing message", natMsg)
return
}

Expand All @@ -520,13 +523,14 @@ func (ec *eventConsumer) consumeReshareEvent() error {
msg.NewThreshold,
errors.New("validation: session ID is empty"),
"Session ID is empty",
natMsg,
)
return
}

if err := ec.identityStore.VerifyInitiatorMessage(&msg); err != nil {
logger.Error("Failed to verify initiator message", err)
ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message")
ec.handleReshareSessionError(msg.WalletID, msg.KeyType, msg.NewThreshold, err, "Failed to verify initiator message", natMsg)
return
}

Expand All @@ -536,7 +540,7 @@ func (ec *eventConsumer) consumeReshareEvent() error {
sessionType, err := sessionTypeFromKeyType(keyType)
if err != nil {
logger.Error("Failed to get session type", err)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to get session type", natMsg)
return
}

Expand All @@ -555,13 +559,13 @@ func (ec *eventConsumer) consumeReshareEvent() error {
oldSession, err := createSession(false)
if err != nil {
logger.Error("Failed to create old reshare session", err, "walletID", walletID)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create old reshare session", natMsg)
return
}
newSession, err := createSession(true)
if err != nil {
logger.Error("Failed to create new reshare session", err, "walletID", walletID)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to create new reshare session", natMsg)
return
}

Expand Down Expand Up @@ -597,7 +601,7 @@ func (ec *eventConsumer) consumeReshareEvent() error {
return
case err := <-oldSession.ErrChan():
logger.Error("Old reshare session error", err)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Old reshare session error", natMsg)
doneOld() // Cancel the context to stop this session
return
}
Expand All @@ -621,7 +625,7 @@ func (ec *eventConsumer) consumeReshareEvent() error {
return
case err := <-newSession.ErrChan():
logger.Error("New reshare session error", err)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "New reshare session error", natMsg)
doneNew() // Cancel the context to stop this session
return
}
Expand All @@ -637,7 +641,7 @@ func (ec *eventConsumer) consumeReshareEvent() error {
successBytes, err := json.Marshal(successEvent)
if err != nil {
logger.Error("Failed to marshal reshare success event", err)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to marshal reshare success event", natMsg)
return
}

Expand All @@ -646,11 +650,11 @@ func (ec *eventConsumer) consumeReshareEvent() error {
key,
successBytes,
&messaging.EnqueueOptions{
IdempotententKey: key,
IdempotententKey: composeReshareIdempotentKey(msg.SessionID, natMsg),
})
if err != nil {
logger.Error("Failed to publish reshare success message", err)
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message")
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to publish reshare success message", natMsg)
return
}
logger.Info("[COMPLETED RESHARE] Successfully published", "walletID", walletID)
Expand All @@ -670,6 +674,7 @@ func (ec *eventConsumer) handleReshareSessionError(
newThreshold int,
err error,
contextMsg string,
natMsg *nats.Msg,
) {
fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err)
errorCode := event.GetErrorCodeFromError(err)
Expand Down Expand Up @@ -702,7 +707,7 @@ func (ec *eventConsumer) handleReshareSessionError(

key := fmt.Sprintf(mpc.TypeReshareWalletResultFmt, walletID)
err = ec.reshareResultQueue.Enqueue(key, reshareResultBytes, &messaging.EnqueueOptions{
IdempotententKey: key,
IdempotententKey: composeReshareIdempotentKey(walletID, natMsg),
})
if err != nil {
logger.Error("Failed to enqueue reshare result event", err,
Expand Down Expand Up @@ -799,13 +804,26 @@ func sessionTypeFromKeyType(keyType types.KeyType) (mpc.SessionType, error) {
}
}

func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string {
// composeIdempotentKey creates an idempotent key for different MPC operation types
func composeIdempotentKey(baseID string, natMsg *nats.Msg, formatTemplate string) string {
var uniqueKey string
sid := natMsg.Header.Get("SessionID")
if sid != "" {
uniqueKey = fmt.Sprintf("%s:%s", walletID, sid)
uniqueKey = fmt.Sprintf("%s:%s", baseID, sid)
} else {
uniqueKey = walletID
uniqueKey = baseID
}
return fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, uniqueKey)
return fmt.Sprintf(formatTemplate, uniqueKey)
}

func composeKeygenIdempotentKey(walletID string, natMsg *nats.Msg) string {
return composeIdempotentKey(walletID, natMsg, mpc.TypeGenerateWalletResultFmt)
}

func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string {
return composeIdempotentKey(txID, natMsg, mpc.TypeSigningResultFmt)
}

func composeReshareIdempotentKey(sessionID string, natMsg *nats.Msg) string {
return composeIdempotentKey(sessionID, natMsg, mpc.TypeReshareWalletResultFmt)
}
4 changes: 3 additions & 1 deletion pkg/mpc/ecdsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func newECDSASigningSession(
keyinfoStore keyinfo.Store,
resultQueue messaging.MessageQueue,
identityStore identity.Store,
idempotentKey string,
) *ecdsaSigningSession {
return &ecdsaSigningSession{
session: session{
Expand Down Expand Up @@ -81,6 +82,7 @@ func newECDSASigningSession(
getRoundFunc: GetEcdsaMsgRound,
resultQueue: resultQueue,
identityStore: identityStore,
idempotentKey: idempotentKey,
},
endCh: make(chan *common.SignatureData),
txID: txID,
Expand Down Expand Up @@ -178,7 +180,7 @@ func (s *ecdsaSigningSession) Sign(onSuccess func(data []byte)) {
}

err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.txID,
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
Expand Down
4 changes: 3 additions & 1 deletion pkg/mpc/eddsa_signing_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func newEDDSASigningSession(
keyinfoStore keyinfo.Store,
resultQueue messaging.MessageQueue,
identityStore identity.Store,
idempotentKey string,
) *eddsaSigningSession {
return &eddsaSigningSession{
session: session{
Expand Down Expand Up @@ -72,6 +73,7 @@ func newEDDSASigningSession(
getRoundFunc: GetEddsaMsgRound,
resultQueue: resultQueue,
identityStore: identityStore,
idempotentKey: idempotentKey,
},
endCh: make(chan *common.SignatureData),
txID: txID,
Expand Down Expand Up @@ -167,7 +169,7 @@ func (s *eddsaSigningSession) Sign(onSuccess func(data []byte)) {
}

err = s.resultQueue.Enqueue(event.SigningResultCompleteTopic, bytes, &messaging.EnqueueOptions{
IdempotententKey: s.txID,
IdempotententKey: s.idempotentKey,
})
if err != nil {
s.ErrCh <- errors.Wrap(err, "Failed to publish sign success message")
Expand Down
3 changes: 3 additions & 0 deletions pkg/mpc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (p *Node) CreateSigningSession(
txID string,
networkInternalCode string,
resultQueue messaging.MessageQueue,
idempotentKey string,
) (SigningSession, error) {
version := p.getVersion(sessionType, walletID)
keyInfo, err := p.getKeyInfo(sessionType, walletID)
Expand Down Expand Up @@ -211,6 +212,7 @@ func (p *Node) CreateSigningSession(
p.keyinfoStore,
resultQueue,
p.identityStore,
idempotentKey,
), nil

case SessionTypeEDDSA:
Expand All @@ -228,6 +230,7 @@ func (p *Node) CreateSigningSession(
p.keyinfoStore,
resultQueue,
p.identityStore,
idempotentKey,
), nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/mpc/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SessionType string
const (
TypeGenerateWalletResultFmt = "mpc.mpc_keygen_result.%s"
TypeReshareWalletResultFmt = "mpc.mpc_reshare_result.%s"
TypeSigningResultFmt = "mpc.mpc_signing_result.%s"

SessionTypeECDSA SessionType = "session_ecdsa"
SessionTypeEDDSA SessionType = "session_eddsa"
Expand Down Expand Up @@ -72,8 +73,9 @@ type session struct {
getRoundFunc GetRoundFunc
mu sync.Mutex
// After the session is done, the key will be stored pubkeyBytes
pubkeyBytes []byte
sessionType SessionType
pubkeyBytes []byte
sessionType SessionType
idempotentKey string
}

func (s *session) PartyID() *tss.PartyID {
Expand Down
Loading