Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f32c677
Add resharing functionality with event handling and session management
vietddude Jul 1, 2025
87f1e77
preload preparams
vietddude Jul 1, 2025
ebe0b9a
Fix KeyTypeEd25519 declaration for consistency in eventconsumer and m…
vietddude Jul 1, 2025
2620fcc
Remove go.uber.org/mock v0.3.0 dependency from go.mod and go.sum
vietddude Jul 1, 2025
9661ab3
Update reshare event handling logic and clean up ECDSA reshare sessio…
vietddude Jul 1, 2025
9a19171
Enhance reshare session management by introducing versioning and upda…
vietddude Jul 1, 2025
2321371
complete reshare ecdsa supports input threshold and peer ids
vietddude Jul 2, 2025
0723c0d
support eddsa reshare
vietddude Jul 2, 2025
dc27b9a
remove debug log
vietddude Jul 2, 2025
a1df6bf
chore: resolve PR review comments
vietddude Jul 2, 2025
25d89d6
refactor: improve session type handling and key storage logic
vietddude Jul 2, 2025
c48abd8
Fix issue key not found after resharing
anhthii Jul 3, 2025
6a83226
Populate version in signing session
anhthii Jul 3, 2025
2a3d21a
Adjust signing logic, only require t+1 nodes to participate
anhthii Jul 3, 2025
350b48d
Better error handling
anhthii Jul 3, 2025
d2129f0
Enrich signing session message
anhthii Jul 3, 2025
0d99150
Improve nats configuration
anhthii Jul 3, 2025
b7a6185
Fix signing eddsa crash on key v1
anhthii Jul 3, 2025
70544af
Implement coordination, waiting for enough peers before consuming sig…
anhthii Jul 3, 2025
ce8a642
Rename to improve consistent naming
anhthii Jul 3, 2025
bcc8019
Include error in keygen and reshare event
anhthii Jul 3, 2025
21867b0
Rename success event -> result event
anhthii Jul 3, 2025
721c817
Fix resharing, only require t+1 participants, add session_id to remov…
anhthii Jul 4, 2025
1849bb8
Refactor handle resharing error
anhthii Jul 4, 2025
14d6952
Consistent reshare result topic and fix Inconsistent Success Event
anhthii Jul 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions cmd/mpcium/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/fystack/mpcium/pkg/config"
"github.com/fystack/mpcium/pkg/constant"
Expand Down Expand Up @@ -130,14 +131,17 @@ func runNode(ctx context.Context, c *cli.Command) error {

directMessaging := messaging.NewNatsDirectMessaging(natsConn)
mqManager := messaging.NewNATsMessageQueueManager("mpc", []string{
"mpc.mpc_keygen_success.*",
"mpc.mpc_keygen_result.*",
event.SigningResultTopic,
"mpc.mpc_reshare_result.*",
}, natsConn)

genKeySuccessQueue := mqManager.NewMessageQueue("mpc_keygen_success")
defer genKeySuccessQueue.Close()
singingResultQueue := mqManager.NewMessageQueue("signing_result")
genKeyResultQueue := mqManager.NewMessageQueue("mpc_keygen_result")
defer genKeyResultQueue.Close()
singingResultQueue := mqManager.NewMessageQueue("mpc_signing_result")
defer singingResultQueue.Close()
reshareResultQueue := mqManager.NewMessageQueue("mpc_reshare_result")
defer reshareResultQueue.Close()

logger.Info("Node is running", "peerID", nodeID, "name", nodeName)

Expand All @@ -159,8 +163,9 @@ func runNode(ctx context.Context, c *cli.Command) error {
eventConsumer := eventconsumer.NewEventConsumer(
mpcNode,
pubsub,
genKeySuccessQueue,
genKeyResultQueue,
singingResultQueue,
reshareResultQueue,
identityStore,
)
eventConsumer.Run()
Expand All @@ -173,7 +178,7 @@ func runNode(ctx context.Context, c *cli.Command) error {

timeoutConsumer.Run()
defer timeoutConsumer.Close()
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub)
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub, peerRegistry)

// Make the node ready before starting the signing consumer
peerRegistry.Ready()
Expand Down Expand Up @@ -340,16 +345,32 @@ func NewBadgerKV(nodeName string) *kvstore.BadgerKVStore {
}

func GetNATSConnection(environment string) (*nats.Conn, error) {
if environment != constant.EnvProduction {
return nats.Connect(viper.GetString("nats.url"))
url := viper.GetString("nats.url")
opts := []nats.Option{
nats.MaxReconnects(-1), // retry forever
nats.ReconnectWait(2 * time.Second),
nats.DisconnectHandler(func(nc *nats.Conn) {
logger.Warn("Disconnected from NATS")
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
logger.Info("Reconnected to NATS", "url", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
logger.Info("NATS connection closed!")
}),
}
clientCert := filepath.Join(".", "certs", "client-cert.pem")
clientKey := filepath.Join(".", "certs", "client-key.pem")
caCert := filepath.Join(".", "certs", "rootCA.pem")

return nats.Connect(viper.GetString("nats.url"),
nats.ClientCert(clientCert, clientKey),
nats.RootCAs(caCert),
nats.UserInfo(viper.GetString("nats.username"), viper.GetString("nats.password")),
)

if environment == constant.EnvProduction {
clientCert := filepath.Join(".", "certs", "client-cert.pem")
clientKey := filepath.Join(".", "certs", "client-key.pem")
caCert := filepath.Join(".", "certs", "rootCA.pem")

opts = append(opts,
nats.ClientCert(clientCert, clientKey),
nats.RootCAs(caCert),
nats.UserInfo(viper.GetString("nats.username"), viper.GetString("nats.password")),
)
}

return nats.Connect(url, opts...)
}
2 changes: 1 addition & 1 deletion examples/generate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
}

// STEP 2: Register the result handler AFTER all walletIDs are stored
err = mpcClient.OnWalletCreationResult(func(event event.KeygenSuccessEvent) {
err = mpcClient.OnWalletCreationResult(func(event event.KeygenResultEvent) {
now := time.Now()
startTimeAny, ok := walletStartTimes.Load(event.WalletID)
if ok {
Expand Down
69 changes: 69 additions & 0 deletions examples/reshare/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/fystack/mpcium/pkg/client"
"github.com/fystack/mpcium/pkg/config"
"github.com/fystack/mpcium/pkg/event"
"github.com/fystack/mpcium/pkg/logger"
"github.com/fystack/mpcium/pkg/types"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/spf13/viper"
)

func main() {
const environment = "dev"
config.InitViperConfig()
logger.Init(environment, true)

natsURL := viper.GetString("nats.url")
natsConn, err := nats.Connect(natsURL)
if err != nil {
logger.Fatal("Failed to connect to NATS", err)
}
defer natsConn.Drain()
defer natsConn.Close()

mpcClient := client.NewMPCClient(client.Options{
NatsConn: natsConn,
KeyPath: "./event_initiator.key",
})

// 3) Listen for signing results
err = mpcClient.OnResharingResult(func(evt event.ResharingResultEvent) {
logger.Info("Resharing result received",
"walletID", evt.WalletID,
"pubKey", fmt.Sprintf("%x", evt.PubKey),
"newThreshold", evt.NewThreshold,
"keyType", evt.KeyType,
)
})
if err != nil {
logger.Fatal("Failed to subscribe to OnResharingResult", err)
}

resharingMsg := &types.ResharingMessage{
SessionID: uuid.NewString(),
WalletID: "bf2cc849-8e55-47e4-ab73-e17fb1eb690c",
NodeIDs: []string{"d926fa75-72c7-4538-9052-4a064a84981d", "7b1090cd-ffe3-46ff-8375-594dd3204169"}, // new peer IDs

NewThreshold: 2, // t+1 <= len(NodeIDs)
KeyType: types.KeyTypeEd25519,
}
err = mpcClient.Resharing(resharingMsg)
if err != nil {
logger.Fatal("Resharing failed", err)
}
fmt.Printf("Resharing(%q) sent, awaiting result...\n", resharingMsg.WalletID)

stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop

fmt.Println("Shutting down.")
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
github.com/spf13/viper v1.18.0
github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v3 v3.3.2
go.uber.org/mock v0.3.0
golang.org/x/term v0.31.0
)

Expand Down Expand Up @@ -68,7 +67,7 @@ require (
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
Expand Down Expand Up @@ -373,8 +375,6 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo=
go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
Expand Down
86 changes: 68 additions & 18 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,28 @@ import (
)

const (
GenerateWalletSuccessTopic = "mpc.mpc_keygen_success.*" // wildcard to listen to all success events
GenerateWalletSuccessTopic = "mpc.mpc_keygen_result.*" // wildcard to listen to all success events
ResharingSuccessTopic = "mpc.mpc_reshare_result.*" // wildcard to listen to all success events
)

type MPCClient interface {
CreateWallet(walletID string) error
OnWalletCreationResult(callback func(event event.KeygenSuccessEvent)) error
OnWalletCreationResult(callback func(event event.KeygenResultEvent)) error

SignTransaction(msg *types.SignTxMessage) error
OnSignResult(callback func(event event.SigningResultEvent)) error

Resharing(msg *types.ResharingMessage) error
OnResharingResult(callback func(event event.ResharingResultEvent)) error
}

type mpcClient struct {
signingStream messaging.StreamPubsub
pubsub messaging.PubSub
genKeySuccessQueue messaging.MessageQueue
signResultQueue messaging.MessageQueue
privKey ed25519.PrivateKey
signingStream messaging.StreamPubsub
pubsub messaging.PubSub
genKeySuccessQueue messaging.MessageQueue
signResultQueue messaging.MessageQueue
reshareSuccessQueue messaging.MessageQueue
privKey ed25519.PrivateKey
}

// Options defines configuration options for creating a new MPCClient
Expand Down Expand Up @@ -120,19 +125,22 @@ func NewMPCClient(opts Options) MPCClient {
pubsub := messaging.NewNATSPubSub(opts.NatsConn)

manager := messaging.NewNATsMessageQueueManager("mpc", []string{
"mpc.mpc_keygen_success.*",
"mpc.signing_result.*",
"mpc.mpc_keygen_result.*",
"mpc.mpc_signing_result.*",
"mpc.mpc_reshare_result.*",
}, opts.NatsConn)

genKeySuccessQueue := manager.NewMessageQueue("mpc_keygen_success")
signResultQueue := manager.NewMessageQueue("signing_result")
genKeySuccessQueue := manager.NewMessageQueue("mpc_keygen_result")
signResultQueue := manager.NewMessageQueue("mpc_signing_result")
reshareSuccessQueue := manager.NewMessageQueue("mpc_reshare_result")

return &mpcClient{
signingStream: signingStream,
pubsub: pubsub,
genKeySuccessQueue: genKeySuccessQueue,
signResultQueue: signResultQueue,
privKey: priv,
signingStream: signingStream,
pubsub: pubsub,
genKeySuccessQueue: genKeySuccessQueue,
signResultQueue: signResultQueue,
reshareSuccessQueue: reshareSuccessQueue,
privKey: priv,
}
}

Expand Down Expand Up @@ -185,9 +193,9 @@ func (c *mpcClient) CreateWallet(walletID string) error {
}

// The callback will be invoked whenever a wallet creation result is received.
func (c *mpcClient) OnWalletCreationResult(callback func(event event.KeygenSuccessEvent)) error {
func (c *mpcClient) OnWalletCreationResult(callback func(event event.KeygenResultEvent)) error {
err := c.genKeySuccessQueue.Dequeue(GenerateWalletSuccessTopic, func(msg []byte) error {
var event event.KeygenSuccessEvent
var event event.KeygenResultEvent
err := json.Unmarshal(msg, &event)
if err != nil {
return err
Expand Down Expand Up @@ -241,3 +249,45 @@ func (c *mpcClient) OnSignResult(callback func(event event.SigningResultEvent))

return nil
}

func (c *mpcClient) Resharing(msg *types.ResharingMessage) error {
// compute the canonical raw bytes
raw, err := msg.Raw()
if err != nil {
return fmt.Errorf("Resharing: raw payload error: %w", err)
}
// sign
msg.Signature = ed25519.Sign(c.privKey, raw)

bytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("Resharing: marshal error: %w", err)
}

if err := c.pubsub.Publish(eventconsumer.MPCReshareEvent, bytes); err != nil {
return fmt.Errorf("Resharing: publish error: %w", err)
}
return nil
}

func (c *mpcClient) OnResharingResult(callback func(event event.ResharingResultEvent)) error {

err := c.reshareSuccessQueue.Dequeue(ResharingSuccessTopic, func(msg []byte) error {
logger.Info("Received reshare success message", "raw", string(msg))
var event event.ResharingResultEvent
err := json.Unmarshal(msg, &event)
if err != nil {
logger.Error("Failed to unmarshal reshare success event", err, "raw", string(msg))
return err
}
logger.Info("Deserialized reshare success event", "event", event)
callback(event)
return nil
})

if err != nil {
return fmt.Errorf("OnResharingResult: subscribe error: %w", err)
}

return nil
}
7 changes: 0 additions & 7 deletions pkg/event/generate.go

This file was deleted.

11 changes: 11 additions & 0 deletions pkg/event/keygen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package event

type KeygenResultEvent struct {
WalletID string `json:"wallet_id"`
ECDSAPubKey []byte `json:"ecdsa_pub_key"`
EDDSAPubKey []byte `json:"eddsa_pub_key"`

ResultType ResultType `json:"result_type"`
ErrorReason string `json:"error_reason"`
ErrorCode string `json:"error_code"`
}
14 changes: 14 additions & 0 deletions pkg/event/reshare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package event

import "github.com/fystack/mpcium/pkg/types"

type ResharingResultEvent struct {
WalletID string `json:"wallet_id"`
NewThreshold int `json:"new_threshold"`
KeyType types.KeyType `json:"key_type"`
PubKey []byte `json:"pub_key"`

ResultType ResultType `json:"result_type"`
ErrorReason string `json:"error_reason"`
ErrorCode string `json:"error_code"`
}
Loading