Skip to content
41 changes: 9 additions & 32 deletions cmd/mpcium/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}
defer natsConn.Close()

pubsub := messaging.NewNATSPubSub(natsConn)
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
Expand Down Expand Up @@ -162,7 +161,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Info("Node is running", "ID", nodeID, "name", nodeName)

peerNodeIDs := GetPeerIDs(peers)
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging)
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging, pubsub, identityStore)

mpcNode := mpc.NewNode(
nodeID,
Expand All @@ -176,9 +175,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
)
defer mpcNode.Close()

// ECDH session for DH key exchange
ecdhSession := mpcNode.GetECDHSession()

eventConsumer := eventconsumer.NewEventConsumer(
mpcNode,
pubsub,
Expand All @@ -197,21 +193,16 @@ func runNode(ctx context.Context, c *cli.Command) error {

timeoutConsumer.Run()
defer timeoutConsumer.Close()
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry)
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry)
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry, genKeyResultQueue)
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry, singingResultQueue)

// Make the node ready before starting the signing consumer
if err := peerRegistry.Ready(); err != nil {
logger.Error("Failed to mark peer registry as ready", err)
}
logger.Info("[READY] Node is ready", "nodeID", nodeID)

logger.Info("Waiting for ECDH key exchange to complete...", "nodeID", nodeID)
if err := ecdhSession.WaitForExchangeComplete(); err != nil {
logger.Fatal("ECDH exchange failed", err)
}

logger.Info("ECDH key exchange completed successfully, starting consumers...", "nodeID", nodeID)
logger.Info("Starting consumers", "nodeID", nodeID)
appContext, cancel := context.WithCancel(context.Background())
//Setup signal handling to cancel context on termination signals.
go func() {
Expand All @@ -221,6 +212,11 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Warn("Shutdown signal received, canceling context...")
cancel()

// Resign from peer registry first (before closing NATS)
if err := peerRegistry.Resign(); err != nil {
logger.Error("Failed to resign from peer registry", err)
}

// Gracefully close consumers
if err := keygenConsumer.Close(); err != nil {
logger.Error("Failed to close keygen consumer", err)
Expand All @@ -229,10 +225,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Error("Failed to close signing consumer", err)
}

if err := ecdhSession.Close(); err != nil {
logger.Error("Failed to close ECDH session", err)
}

err := natsConn.Drain()
if err != nil {
logger.Error("Failed to drain NATS connection", err)
Expand Down Expand Up @@ -264,21 +256,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
logger.Info("Signing consumer finished successfully")
}()

go func() {
for {
select {
case <-appContext.Done():
return
case err := <-ecdhSession.ErrChan():
if err != nil {
logger.Error("ECDH session error", err)
errChan <- fmt.Errorf("ecdh session error: %w", err)
return
}
}
}
}()

go func() {
wg.Wait()
logger.Info("All consumers have finished")
Expand Down
2 changes: 2 additions & 0 deletions pkg/event/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ const (
// Context and cancellation errors
ErrorCodeContextCancelled ErrorCode = "ERROR_CONTEXT_CANCELLED"
ErrorCodeOperationAborted ErrorCode = "ERROR_OPERATION_ABORTED"
ErrorCodeNotMajority ErrorCode = "ERROR_NOT_MAJORITY"
ErrorCodeClusterNotReady ErrorCode = "ERROR_CLUSTER_NOT_READY"
)

// GetErrorCodeFromError attempts to categorize a generic error into a specific error code
Expand Down
77 changes: 67 additions & 10 deletions pkg/eventconsumer/keygen_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package eventconsumer

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/fystack/mpcium/pkg/event"
"github.com/fystack/mpcium/pkg/logger"
"github.com/fystack/mpcium/pkg/messaging"
"github.com/fystack/mpcium/pkg/mpc"
"github.com/fystack/mpcium/pkg/types"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
Expand All @@ -31,22 +34,30 @@ type KeygenConsumer interface {

// keygenConsumer implements KeygenConsumer.
type keygenConsumer struct {
natsConn *nats.Conn
pubsub messaging.PubSub
jsBroker messaging.MessageBroker
peerRegistry mpc.PeerRegistry
natsConn *nats.Conn
pubsub messaging.PubSub
jsBroker messaging.MessageBroker
peerRegistry mpc.PeerRegistry
keygenResultQueue messaging.MessageQueue

// jsSub holds the JetStream subscription, so it can be cleaned up during Close().
jsSub messaging.MessageSubscription
}

// NewKeygenConsumer returns a new instance of KeygenConsumer.
func NewKeygenConsumer(natsConn *nats.Conn, jsBroker messaging.MessageBroker, pubsub messaging.PubSub, peerRegistry mpc.PeerRegistry) KeygenConsumer {
func NewKeygenConsumer(
natsConn *nats.Conn,
jsBroker messaging.MessageBroker,
pubsub messaging.PubSub,
peerRegistry mpc.PeerRegistry,
keygenResultQueue messaging.MessageQueue,
) KeygenConsumer {
return &keygenConsumer{
natsConn: natsConn,
pubsub: pubsub,
jsBroker: jsBroker,
peerRegistry: peerRegistry,
natsConn: natsConn,
pubsub: pubsub,
jsBroker: jsBroker,
peerRegistry: peerRegistry,
keygenResultQueue: keygenResultQueue,
}
}

Expand All @@ -60,6 +71,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
for {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
return nil
}
return ctx.Err()
case <-ticker.C:
allPeersReady := sc.peerRegistry.ArePeersReady()
Expand All @@ -80,6 +94,9 @@ func (sc *keygenConsumer) waitForAllPeersReadyToGenKey(ctx context.Context) erro
func (sc *keygenConsumer) Run(ctx context.Context) error {
// Wait for sufficient peers before starting to consume messages
if err := sc.waitForAllPeersReadyToGenKey(ctx); err != nil {
if err == context.Canceled {
return nil
}
return fmt.Errorf("failed to wait for sufficient peers: %w", err)
}

Expand All @@ -104,9 +121,22 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
}

func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
raw := msg.Data()
var keygenMsg types.GenerateKeyMessage
sessionID := msg.Headers().Get("SessionID")

err := json.Unmarshal(raw, &keygenMsg)
if err != nil {
logger.Error("SigningConsumer: Failed to unmarshal keygen message", err)
sc.handleKeygenError(keygenMsg, event.ErrorCodeUnmarshalFailure, err, sessionID)
_ = msg.Ack()
return
}

if !sc.peerRegistry.ArePeersReady() {
logger.Warn("KeygenConsumer: Not all peers are ready to sign, skipping message processing")
logger.Warn("KeygenConsumer: Not all peers are ready to gen key, skipping message processing")
sc.handleKeygenError(keygenMsg, event.ErrorCodeClusterNotReady, errors.New("not all peers are ready"), sessionID)
_ = msg.Ack()
return
}

Expand Down Expand Up @@ -161,6 +191,33 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
_ = msg.Nak()
}

func (sc *keygenConsumer) handleKeygenError(keygenMsg types.GenerateKeyMessage, errorCode event.ErrorCode, err error, sessionID string) {
keygenResult := event.KeygenResultEvent{
ResultType: event.ResultTypeError,
ErrorCode: string(errorCode),
WalletID: keygenMsg.WalletID,
ErrorReason: err.Error(),
}

keygenResultBytes, err := json.Marshal(keygenResult)
if err != nil {
logger.Error("Failed to marshal keygen result event", err,
"walletID", keygenResult.WalletID,
)
return
}

topic := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, keygenResult.WalletID)
err = sc.keygenResultQueue.Enqueue(topic, keygenResultBytes, &messaging.EnqueueOptions{
IdempotententKey: buildIdempotentKey(keygenMsg.WalletID, sessionID, mpc.TypeGenerateWalletResultFmt),
})
if err != nil {
logger.Error("Failed to enqueue keygen result event", err,
"walletID", keygenMsg.WalletID,
)
}
}

// Close unsubscribes from the JetStream subject and cleans up resources.
func (sc *keygenConsumer) Close() error {
if sc.jsSub != nil {
Expand Down
Loading
Loading