From 406da1d3f95aacddc20d35b294b59a427418bd47 Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 5 Apr 2025 00:34:11 +0200 Subject: [PATCH 1/4] feat: add support for per-message metadata This adds support for per-message (client-side) metadata md.MD to be passed in Gorums' Metadata object. This allows a client (or interceptor) to pass custom data to the server replicas. For example, a client can pass its client-id or request-id to the server, independent of the application-level message used. --- async.go | 2 +- channel_test.go | 24 +-- cmd/protoc-gen-gorums/dev/zorums.pb.go | 210 +++++++------------------ correctable.go | 2 +- multicast.go | 2 +- ordering/gorums_metadata.go | 38 +++++ ordering/ordering.pb.go | 151 ++++++++++++++---- ordering/ordering.proto | 7 + quorumcall.go | 2 +- rpc.go | 2 +- server.go | 2 +- tests/metadata/metadata_test.go | 38 ++++- unicast.go | 2 +- 13 files changed, 275 insertions(+), 207 deletions(-) create mode 100644 ordering/gorums_metadata.go diff --git a/async.go b/async.go index a383323a..c8782c68 100644 --- a/async.go +++ b/async.go @@ -46,7 +46,7 @@ type asyncCallState struct { // This function should only be used by generated code. func (c RawConfiguration) AsyncCall(ctx context.Context, d QuorumCallData) *Async { expectedReplies := len(c) - md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method) replyChan := make(chan response, expectedReplies) for _, n := range c { diff --git a/channel_test.go b/channel_test.go index f05ce4e9..47107208 100644 --- a/channel_test.go +++ b/channel_test.go @@ -44,7 +44,7 @@ func TestChannelCreation(t *testing.T) { if err != nil { t.Fatal(err) } - //the node should be closed manually because it isn't added to the configuration + // the node should be closed manually because it isn't added to the configuration defer node.close() mgr := dummyMgr() // a proper connection should NOT be established here @@ -52,8 +52,9 @@ func TestChannelCreation(t *testing.T) { replyChan := make(chan response, 1) go func() { - md := ordering.Metadata_builder{MessageID: 1, Method: handlerName}.Build() - req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + ctx := context.Background() + md := ordering.NewGorumsMetadata(ctx, 1, handlerName) + req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}} node.channel.enqueue(req, replyChan, false) }() select { @@ -118,7 +119,7 @@ func TestChannelReconnection(t *testing.T) { if err != nil { t.Fatal(err) } - //the node should be closed manually because it isn't added to the configuration + // the node should be closed manually because it isn't added to the configuration defer node.close() mgr := dummyMgr() // a proper connection should NOT be established here because server is not started @@ -127,8 +128,9 @@ func TestChannelReconnection(t *testing.T) { // send first message when server is down replyChan1 := make(chan response, 1) go func() { - md := ordering.Metadata_builder{MessageID: 1, Method: handlerName}.Build() - req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + ctx := context.Background() + md := ordering.NewGorumsMetadata(ctx, 1, handlerName) + req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}} node.channel.enqueue(req, replyChan1, false) }() @@ -147,8 +149,9 @@ func TestChannelReconnection(t *testing.T) { // send second message when server is up replyChan2 := make(chan response, 1) go func() { - md := ordering.Metadata_builder{MessageID: 2, Method: handlerName}.Build() - req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)} + ctx := context.Background() + md := ordering.NewGorumsMetadata(ctx, 2, handlerName) + req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)} node.channel.enqueue(req, replyChan2, false) }() @@ -167,8 +170,9 @@ func TestChannelReconnection(t *testing.T) { // send third message when server has been previously up but is now down replyChan3 := make(chan response, 1) go func() { - md := ordering.Metadata_builder{MessageID: 3, Method: handlerName}.Build() - req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}} + ctx := context.Background() + md := ordering.NewGorumsMetadata(ctx, 3, handlerName) + req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}} node.channel.enqueue(req, replyChan3, false) }() diff --git a/cmd/protoc-gen-gorums/dev/zorums.pb.go b/cmd/protoc-gen-gorums/dev/zorums.pb.go index 79c0d66d..8c1f169c 100644 --- a/cmd/protoc-gen-gorums/dev/zorums.pb.go +++ b/cmd/protoc-gen-gorums/dev/zorums.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.5 +// protoc-gen-go v1.36.6 // protoc v5.29.3 // source: zorums.proto @@ -195,158 +195,62 @@ func (b0 MyResponse_builder) Build() *MyResponse { var File_zorums_proto protoreflect.FileDescriptor -var file_zorums_proto_rawDesc = string([]byte{ - 0x0a, 0x0c, 0x7a, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, - 0x64, 0x65, 0x76, 0x1a, 0x0c, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1f, - 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, - 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, - 0x22, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x22, 0x22, 0x0a, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x86, 0x11, 0x0a, 0x0d, 0x5a, 0x6f, 0x72, 0x75, - 0x6d, 0x73, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x29, 0x0a, 0x08, 0x47, 0x52, 0x50, - 0x43, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x0a, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, - 0x6c, 0x6c, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x04, 0xa0, 0xb5, 0x18, 0x01, 0x12, 0x3d, 0x0a, 0x14, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, - 0x61, 0x6c, 0x6c, 0x50, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x72, 0x67, 0x12, 0x0c, 0x2e, - 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, - 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, - 0xa0, 0xb6, 0x18, 0x01, 0x12, 0x4d, 0x0a, 0x1a, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, - 0x6c, 0x6c, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x12, 0xa0, 0xb5, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x0f, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, - 0x6c, 0x43, 0x6f, 0x6d, 0x62, 0x6f, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x16, 0xa0, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0xf2, 0xb6, 0x18, - 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x0f, 0x51, - 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa0, 0xb5, 0x18, 0x01, 0x12, 0x3e, 0x0a, 0x10, 0x51, - 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0x12, - 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x04, 0xa0, 0xb5, 0x18, 0x01, 0x12, 0x2e, 0x0a, 0x09, 0x4d, - 0x75, 0x6c, 0x74, 0x69, 0x63, 0x61, 0x73, 0x74, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0x98, 0xb5, 0x18, 0x01, 0x12, 0x3c, 0x0a, 0x13, 0x4d, - 0x75, 0x6c, 0x74, 0x69, 0x63, 0x61, 0x73, 0x74, 0x50, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x41, - 0x72, 0x67, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x08, 0x98, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0x12, 0x2f, 0x0a, 0x0a, 0x4d, 0x75, 0x6c, - 0x74, 0x69, 0x63, 0x61, 0x73, 0x74, 0x32, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0x98, 0xb5, 0x18, 0x01, 0x12, 0x38, 0x0a, 0x0a, 0x4d, 0x75, - 0x6c, 0x74, 0x69, 0x63, 0x61, 0x73, 0x74, 0x33, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x04, - 0x98, 0xb5, 0x18, 0x01, 0x12, 0x42, 0x0a, 0x0a, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x63, 0x61, 0x73, - 0x74, 0x34, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x22, 0x04, 0x98, 0xb5, 0x18, 0x01, 0x12, 0x38, 0x0a, 0x0f, 0x51, 0x75, 0x6f, 0x72, - 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x12, 0x0c, 0x2e, 0x64, 0x65, - 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, 0xd0, 0xb5, - 0x18, 0x01, 0x12, 0x46, 0x0a, 0x19, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, - 0x41, 0x73, 0x79, 0x6e, 0x63, 0x50, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x72, 0x67, 0x12, - 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, - 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0c, 0xa0, 0xb5, - 0x18, 0x01, 0xd0, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0x12, 0x56, 0x0a, 0x1f, 0x51, 0x75, - 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x43, 0x75, 0x73, - 0x74, 0x6f, 0x6d, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x2e, - 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, - 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x16, 0xa0, 0xb5, 0x18, 0x01, - 0xd0, 0xb5, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x4f, 0x0a, 0x14, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, - 0x41, 0x73, 0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x62, 0x6f, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, - 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x1a, 0xa0, 0xb5, 0x18, 0x01, 0xd0, 0xb5, 0x18, - 0x01, 0xa0, 0xb6, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x10, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, - 0x6c, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x32, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, 0xd0, 0xb5, 0x18, 0x01, 0x12, 0x46, - 0x0a, 0x14, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, 0x43, 0x61, 0x6c, 0x6c, 0x41, 0x73, 0x79, 0x6e, - 0x63, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x08, 0xa0, 0xb5, - 0x18, 0x01, 0xd0, 0xb5, 0x18, 0x01, 0x12, 0x48, 0x0a, 0x15, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d, - 0x43, 0x61, 0x6c, 0x6c, 0x41, 0x73, 0x79, 0x6e, 0x63, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0x12, - 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa0, 0xb5, 0x18, 0x01, 0xd0, 0xb5, 0x18, 0x01, - 0x12, 0x30, 0x0a, 0x0b, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x12, - 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, - 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa8, 0xb5, - 0x18, 0x01, 0x12, 0x3e, 0x0a, 0x15, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x50, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x41, 0x72, 0x67, 0x12, 0x0c, 0x2e, 0x64, 0x65, - 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa8, 0xb5, 0x18, 0x01, 0xa0, 0xb6, - 0x18, 0x01, 0x12, 0x4e, 0x0a, 0x1b, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x12, - 0xa8, 0xb5, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x47, 0x0a, 0x10, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x43, 0x6f, 0x6d, 0x62, 0x6f, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x16, 0xa8, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0xf2, 0xb6, 0x18, - 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x10, 0x43, - 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, - 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x12, 0x40, 0x0a, 0x11, 0x43, - 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, - 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x12, 0x38, 0x0a, - 0x11, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x1b, 0x43, 0x6f, 0x72, 0x72, 0x65, - 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x65, 0x72, 0x4e, - 0x6f, 0x64, 0x65, 0x41, 0x72, 0x67, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x08, 0xa8, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0x30, 0x01, 0x12, - 0x56, 0x0a, 0x21, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x12, 0xa8, 0xb5, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x4f, 0x0a, 0x16, 0x43, 0x6f, 0x72, 0x72, 0x65, - 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x62, - 0x6f, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x16, - 0xa8, 0xb5, 0x18, 0x01, 0xa0, 0xb6, 0x18, 0x01, 0xf2, 0xb6, 0x18, 0x0a, 0x4d, 0x79, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x16, 0x43, 0x6f, 0x72, 0x72, - 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x30, 0x01, - 0x12, 0x48, 0x0a, 0x17, 0x43, 0x6f, 0x72, 0x72, 0x65, 0x63, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0x12, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x04, 0xa8, 0xb5, 0x18, 0x01, 0x30, 0x01, 0x12, 0x2c, 0x0a, 0x07, 0x55, 0x6e, - 0x69, 0x63, 0x61, 0x73, 0x74, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x04, 0x90, 0xb5, 0x18, 0x01, 0x12, 0x36, 0x0a, 0x08, 0x55, 0x6e, 0x69, 0x63, - 0x61, 0x73, 0x74, 0x32, 0x12, 0x0c, 0x2e, 0x64, 0x65, 0x76, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x04, 0x90, 0xb5, 0x18, 0x01, - 0x42, 0x20, 0x5a, 0x19, 0x63, 0x6d, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x2d, 0x67, - 0x65, 0x6e, 0x2d, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x64, 0x65, 0x76, 0x92, 0x03, 0x02, - 0x08, 0x02, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x70, 0xe8, 0x07, -}) +const file_zorums_proto_rawDesc = "" + + "\n" + + "\fzorums.proto\x12\x03dev\x1a\fgorums.proto\x1a\x1bgoogle/protobuf/empty.proto\"\x1f\n" + + "\aRequest\x12\x14\n" + + "\x05Value\x18\x01 \x01(\tR\x05Value\"\"\n" + + "\bResponse\x12\x16\n" + + "\x06Result\x18\x01 \x01(\x03R\x06Result\"\"\n" + + "\n" + + "MyResponse\x12\x14\n" + + "\x05Value\x18\x01 \x01(\tR\x05Value2\x86\x11\n" + + "\rZorumsService\x12)\n" + + "\bGRPCCall\x12\f.dev.Request\x1a\r.dev.Response\"\x00\x12/\n" + + "\n" + + "QuorumCall\x12\f.dev.Request\x1a\r.dev.Response\"\x04\xa0\xb5\x18\x01\x12=\n" + + "\x14QuorumCallPerNodeArg\x12\f.dev.Request\x1a\r.dev.Response\"\b\xa0\xb5\x18\x01\xa0\xb6\x18\x01\x12M\n" + + "\x1aQuorumCallCustomReturnType\x12\f.dev.Request\x1a\r.dev.Response\"\x12\xa0\xb5\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x12F\n" + + "\x0fQuorumCallCombo\x12\f.dev.Request\x1a\r.dev.Response\"\x16\xa0\xb5\x18\x01\xa0\xb6\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x12>\n" + + "\x0fQuorumCallEmpty\x12\x16.google.protobuf.Empty\x1a\r.dev.Response\"\x04\xa0\xb5\x18\x01\x12>\n" + + "\x10QuorumCallEmpty2\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\x04\xa0\xb5\x18\x01\x12.\n" + + "\tMulticast\x12\f.dev.Request\x1a\r.dev.Response\"\x04\x98\xb5\x18\x01\x12<\n" + + "\x13MulticastPerNodeArg\x12\f.dev.Request\x1a\r.dev.Response\"\b\x98\xb5\x18\x01\xa0\xb6\x18\x01\x12/\n" + + "\n" + + "Multicast2\x12\f.dev.Request\x1a\r.dev.Response\"\x04\x98\xb5\x18\x01\x128\n" + + "\n" + + "Multicast3\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x12B\n" + + "\n" + + "Multicast4\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\"\x04\x98\xb5\x18\x01\x128\n" + + "\x0fQuorumCallAsync\x12\f.dev.Request\x1a\r.dev.Response\"\b\xa0\xb5\x18\x01е\x18\x01\x12F\n" + + "\x19QuorumCallAsyncPerNodeArg\x12\f.dev.Request\x1a\r.dev.Response\"\f\xa0\xb5\x18\x01е\x18\x01\xa0\xb6\x18\x01\x12V\n" + + "\x1fQuorumCallAsyncCustomReturnType\x12\f.dev.Request\x1a\r.dev.Response\"\x16\xa0\xb5\x18\x01е\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x12O\n" + + "\x14QuorumCallAsyncCombo\x12\f.dev.Request\x1a\r.dev.Response\"\x1a\xa0\xb5\x18\x01е\x18\x01\xa0\xb6\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x129\n" + + "\x10QuorumCallAsync2\x12\f.dev.Request\x1a\r.dev.Response\"\b\xa0\xb5\x18\x01е\x18\x01\x12F\n" + + "\x14QuorumCallAsyncEmpty\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\b\xa0\xb5\x18\x01е\x18\x01\x12H\n" + + "\x15QuorumCallAsyncEmpty2\x12\x16.google.protobuf.Empty\x1a\r.dev.Response\"\b\xa0\xb5\x18\x01е\x18\x01\x120\n" + + "\vCorrectable\x12\f.dev.Request\x1a\r.dev.Response\"\x04\xa8\xb5\x18\x01\x12>\n" + + "\x15CorrectablePerNodeArg\x12\f.dev.Request\x1a\r.dev.Response\"\b\xa8\xb5\x18\x01\xa0\xb6\x18\x01\x12N\n" + + "\x1bCorrectableCustomReturnType\x12\f.dev.Request\x1a\r.dev.Response\"\x12\xa8\xb5\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x12G\n" + + "\x10CorrectableCombo\x12\f.dev.Request\x1a\r.dev.Response\"\x16\xa8\xb5\x18\x01\xa0\xb6\x18\x01\xf2\xb6\x18\n" + + "MyResponse\x12>\n" + + "\x10CorrectableEmpty\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\x04\xa8\xb5\x18\x01\x12@\n" + + "\x11CorrectableEmpty2\x12\x16.google.protobuf.Empty\x1a\r.dev.Response\"\x04\xa8\xb5\x18\x01\x128\n" + + "\x11CorrectableStream\x12\f.dev.Request\x1a\r.dev.Response\"\x04\xa8\xb5\x18\x010\x01\x12F\n" + + "\x1bCorrectableStreamPerNodeArg\x12\f.dev.Request\x1a\r.dev.Response\"\b\xa8\xb5\x18\x01\xa0\xb6\x18\x010\x01\x12V\n" + + "!CorrectableStreamCustomReturnType\x12\f.dev.Request\x1a\r.dev.Response\"\x12\xa8\xb5\x18\x01\xf2\xb6\x18\n" + + "MyResponse0\x01\x12O\n" + + "\x16CorrectableStreamCombo\x12\f.dev.Request\x1a\r.dev.Response\"\x16\xa8\xb5\x18\x01\xa0\xb6\x18\x01\xf2\xb6\x18\n" + + "MyResponse0\x01\x12F\n" + + "\x16CorrectableStreamEmpty\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\x04\xa8\xb5\x18\x010\x01\x12H\n" + + "\x17CorrectableStreamEmpty2\x12\x16.google.protobuf.Empty\x1a\r.dev.Response\"\x04\xa8\xb5\x18\x010\x01\x12,\n" + + "\aUnicast\x12\f.dev.Request\x1a\r.dev.Response\"\x04\x90\xb5\x18\x01\x126\n" + + "\bUnicast2\x12\f.dev.Request\x1a\x16.google.protobuf.Empty\"\x04\x90\xb5\x18\x01B Z\x19cmd/protoc-gen-gorums/dev\x92\x03\x02\b\x02b\beditionsp\xe8\a" var file_zorums_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_zorums_proto_goTypes = []any{ diff --git a/correctable.go b/correctable.go index 309d734e..f5c95b1c 100644 --- a/correctable.go +++ b/correctable.go @@ -102,7 +102,7 @@ type correctableCallState struct { // This method should only be used by generated code. func (c RawConfiguration) CorrectableCall(ctx context.Context, d CorrectableCallData) *Correctable { expectedReplies := len(c) - md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method) replyChan := make(chan response, expectedReplies) for _, n := range c { diff --git a/multicast.go b/multicast.go index f22c6550..ba70b1ac 100644 --- a/multicast.go +++ b/multicast.go @@ -12,7 +12,7 @@ import ( // before the message has been sent. func (c RawConfiguration) Multicast(ctx context.Context, d QuorumCallData, opts ...CallOption) { o := getCallOptions(E_Multicast, opts) - md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method) sentMsgs := 0 var replyChan chan response diff --git a/ordering/gorums_metadata.go b/ordering/gorums_metadata.go new file mode 100644 index 00000000..b0640e91 --- /dev/null +++ b/ordering/gorums_metadata.go @@ -0,0 +1,38 @@ +package ordering + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +// NewGorumsMetadata creates a new Gorums metadata object for the given method +// and client message ID. It also appends any client-specific metadata from the +// context to the Gorums metadata object. +// +// This is used to pass client-specific metadata to the server via Gorums. +func NewGorumsMetadata(ctx context.Context, msgID uint64, method string) *Metadata { + gorumsMetadata := Metadata_builder{MessageID: msgID, Method: method} + md, _ := metadata.FromOutgoingContext(ctx) + for k, vv := range md { + for _, v := range vv { + entry := MetadataEntry_builder{Key: k, Value: v}.Build() + gorumsMetadata.Entry = append(gorumsMetadata.Entry, entry) + } + } + return gorumsMetadata.Build() +} + +// AppendToIncomingContext appends client-specific metadata from the +// Gorums metadata object to the incoming context. +// +// This is used to pass client-specific metadata from the Gorums runtime +// to the server implementation. +func (x *Metadata) AppendToIncomingContext(ctx context.Context) context.Context { + existingMD, _ := metadata.FromIncomingContext(ctx) + newMD := existingMD.Copy() // copy to avoid mutating the original + for _, entry := range x.GetEntry() { + newMD.Append(entry.GetKey(), entry.GetValue()) + } + return metadata.NewIncomingContext(ctx, newMD) +} diff --git a/ordering/ordering.pb.go b/ordering/ordering.pb.go index 235dcec8..ec74f30e 100644 --- a/ordering/ordering.pb.go +++ b/ordering/ordering.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.5 +// protoc-gen-go v1.36.6 // protoc v5.29.3 // source: ordering/ordering.proto @@ -28,6 +28,7 @@ type Metadata struct { xxx_hidden_MessageID uint64 `protobuf:"varint,1,opt,name=MessageID"` xxx_hidden_Method string `protobuf:"bytes,2,opt,name=Method"` xxx_hidden_Status *status.Status `protobuf:"bytes,3,opt,name=Status"` + xxx_hidden_Entry *[]*MetadataEntry `protobuf:"bytes,4,rep,name=Entry"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -78,6 +79,15 @@ func (x *Metadata) GetStatus() *status.Status { return nil } +func (x *Metadata) GetEntry() []*MetadataEntry { + if x != nil { + if x.xxx_hidden_Entry != nil { + return *x.xxx_hidden_Entry + } + } + return nil +} + func (x *Metadata) SetMessageID(v uint64) { x.xxx_hidden_MessageID = v } @@ -90,6 +100,10 @@ func (x *Metadata) SetStatus(v *status.Status) { x.xxx_hidden_Status = v } +func (x *Metadata) SetEntry(v []*MetadataEntry) { + x.xxx_hidden_Entry = &v +} + func (x *Metadata) HasStatus() bool { if x == nil { return false @@ -107,6 +121,8 @@ type Metadata_builder struct { MessageID uint64 Method string Status *status.Status + // per message client-generated metadata + Entry []*MetadataEntry } func (b0 Metadata_builder) Build() *Metadata { @@ -116,47 +132,114 @@ func (b0 Metadata_builder) Build() *Metadata { x.xxx_hidden_MessageID = b.MessageID x.xxx_hidden_Method = b.Method x.xxx_hidden_Status = b.Status + x.xxx_hidden_Entry = &b.Entry + return m0 +} + +type MetadataEntry struct { + state protoimpl.MessageState `protogen:"opaque.v1"` + xxx_hidden_Key string `protobuf:"bytes,1,opt,name=Key"` + xxx_hidden_Value string `protobuf:"bytes,2,opt,name=Value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MetadataEntry) Reset() { + *x = MetadataEntry{} + mi := &file_ordering_ordering_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MetadataEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetadataEntry) ProtoMessage() {} + +func (x *MetadataEntry) ProtoReflect() protoreflect.Message { + mi := &file_ordering_ordering_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +func (x *MetadataEntry) GetKey() string { + if x != nil { + return x.xxx_hidden_Key + } + return "" +} + +func (x *MetadataEntry) GetValue() string { + if x != nil { + return x.xxx_hidden_Value + } + return "" +} + +func (x *MetadataEntry) SetKey(v string) { + x.xxx_hidden_Key = v +} + +func (x *MetadataEntry) SetValue(v string) { + x.xxx_hidden_Value = v +} + +type MetadataEntry_builder struct { + _ [0]func() // Prevents comparability and use of unkeyed literals for the builder. + + Key string + Value string +} + +func (b0 MetadataEntry_builder) Build() *MetadataEntry { + m0 := &MetadataEntry{} + b, x := &b0, m0 + _, _ = b, x + x.xxx_hidden_Key = b.Key + x.xxx_hidden_Value = b.Value return m0 } var File_ordering_ordering_proto protoreflect.FileDescriptor -var file_ordering_ordering_proto_rawDesc = string([]byte{ - 0x0a, 0x17, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2f, 0x6f, 0x72, 0x64, 0x65, 0x72, - 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x6f, 0x72, 0x64, 0x65, 0x72, - 0x69, 0x6e, 0x67, 0x1a, 0x17, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x72, 0x70, 0x63, 0x2f, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6c, 0x0a, 0x08, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1c, 0x0a, 0x09, 0x4d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x2a, - 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x52, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0x42, 0x0a, 0x06, 0x47, 0x6f, - 0x72, 0x75, 0x6d, 0x73, 0x12, 0x38, 0x0a, 0x0a, 0x4e, 0x6f, 0x64, 0x65, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x12, 0x12, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, 0x67, 0x2e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x12, 0x2e, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, 0x6e, - 0x67, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x28, 0x01, 0x30, 0x01, 0x42, 0x27, - 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x6c, - 0x61, 0x62, 0x2f, 0x67, 0x6f, 0x72, 0x75, 0x6d, 0x73, 0x2f, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x69, - 0x6e, 0x67, 0x92, 0x03, 0x02, 0x08, 0x02, 0x62, 0x08, 0x65, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x70, 0xe8, 0x07, -}) - -var file_ordering_ordering_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +const file_ordering_ordering_proto_rawDesc = "" + + "\n" + + "\x17ordering/ordering.proto\x12\bordering\x1a\x17google/rpc/status.proto\"\x9b\x01\n" + + "\bMetadata\x12\x1c\n" + + "\tMessageID\x18\x01 \x01(\x04R\tMessageID\x12\x16\n" + + "\x06Method\x18\x02 \x01(\tR\x06Method\x12*\n" + + "\x06Status\x18\x03 \x01(\v2\x12.google.rpc.StatusR\x06Status\x12-\n" + + "\x05Entry\x18\x04 \x03(\v2\x17.ordering.MetadataEntryR\x05Entry\"7\n" + + "\rMetadataEntry\x12\x10\n" + + "\x03Key\x18\x01 \x01(\tR\x03Key\x12\x14\n" + + "\x05Value\x18\x02 \x01(\tR\x05Value2B\n" + + "\x06Gorums\x128\n" + + "\n" + + "NodeStream\x12\x12.ordering.Metadata\x1a\x12.ordering.Metadata(\x010\x01B'Z github.com/relab/gorums/ordering\x92\x03\x02\b\x02b\beditionsp\xe8\a" + +var file_ordering_ordering_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_ordering_ordering_proto_goTypes = []any{ (*Metadata)(nil), // 0: ordering.Metadata - (*status.Status)(nil), // 1: google.rpc.Status + (*MetadataEntry)(nil), // 1: ordering.MetadataEntry + (*status.Status)(nil), // 2: google.rpc.Status } var file_ordering_ordering_proto_depIdxs = []int32{ - 1, // 0: ordering.Metadata.Status:type_name -> google.rpc.Status - 0, // 1: ordering.Gorums.NodeStream:input_type -> ordering.Metadata - 0, // 2: ordering.Gorums.NodeStream:output_type -> ordering.Metadata - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 2, // 0: ordering.Metadata.Status:type_name -> google.rpc.Status + 1, // 1: ordering.Metadata.Entry:type_name -> ordering.MetadataEntry + 0, // 2: ordering.Gorums.NodeStream:input_type -> ordering.Metadata + 0, // 3: ordering.Gorums.NodeStream:output_type -> ordering.Metadata + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_ordering_ordering_proto_init() } @@ -170,7 +253,7 @@ func file_ordering_ordering_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ordering_ordering_proto_rawDesc), len(file_ordering_ordering_proto_rawDesc)), NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 1, }, diff --git a/ordering/ordering.proto b/ordering/ordering.proto index ebaf96aa..60eb012f 100644 --- a/ordering/ordering.proto +++ b/ordering/ordering.proto @@ -20,4 +20,11 @@ message Metadata { uint64 MessageID = 1; string Method = 2; google.rpc.Status Status = 3; + // per message client-generated metadata + repeated MetadataEntry Entry = 4; +} + +message MetadataEntry { + string Key = 1; + string Value = 2; } diff --git a/quorumcall.go b/quorumcall.go index 40d6c7be..367a085c 100644 --- a/quorumcall.go +++ b/quorumcall.go @@ -24,7 +24,7 @@ type QuorumCallData struct { // This method should be used by generated code only. func (c RawConfiguration) QuorumCall(ctx context.Context, d QuorumCallData) (resp protoreflect.ProtoMessage, err error) { expectedReplies := len(c) - md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method) replyChan := make(chan response, expectedReplies) for _, n := range c { diff --git a/rpc.go b/rpc.go index d864030b..7cb20c80 100644 --- a/rpc.go +++ b/rpc.go @@ -19,7 +19,7 @@ type CallData struct { // // This method should be used by generated code only. func (n *RawNode) RPCCall(ctx context.Context, d CallData) (protoreflect.ProtoMessage, error) { - md := ordering.Metadata_builder{MessageID: n.mgr.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, n.mgr.getMsgID(), d.Method) replyChan := make(chan response, 1) n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}}, replyChan, false) diff --git a/server.go b/server.go index ea0fd0e9..8179cd09 100644 --- a/server.go +++ b/server.go @@ -95,7 +95,7 @@ func (s *orderingServer) NodeStream(srv ordering.Gorums_NodeStreamServer) error // We start the handler in a new goroutine in order to allow multiple handlers to run concurrently. // However, to preserve request ordering, the handler must unlock the shared mutex when it has either // finished, or when it is safe to start processing the next request. - go handler(ServerCtx{Context: ctx, once: new(sync.Once), mut: &mut}, req, finished) + go handler(ServerCtx{Context: req.Metadata.AppendToIncomingContext(ctx), once: new(sync.Once), mut: &mut}, req, finished) // Wait until the handler releases the mutex. mut.Lock() } diff --git a/tests/metadata/metadata_test.go b/tests/metadata/metadata_test.go index 0a2454ed..64121edf 100644 --- a/tests/metadata/metadata_test.go +++ b/tests/metadata/metadata_test.go @@ -21,15 +21,15 @@ type testSrv struct{} func (srv testSrv) IDFromMD(ctx gorums.ServerCtx, _ *emptypb.Empty) (resp *NodeID, err error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return nil, status.Error(codes.NotFound, "Metadata unavailable") + return nil, status.Error(codes.NotFound, "metadata unavailable") } v := md.Get("id") if len(v) < 1 { - return nil, status.Error(codes.NotFound, "ID field missing") + return nil, status.Error(codes.NotFound, "missing metadata field: id") } id, err := strconv.Atoi(v[0]) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "Got '%s', but could not convert to integer", v[0]) + return nil, status.Errorf(codes.InvalidArgument, "value of id field: %q is not a number: %v", v[0], err) } return &NodeID{ID: uint32(id)}, nil } @@ -80,6 +80,38 @@ func TestMetadata(t *testing.T) { } } +func TestPerMessageMetadata(t *testing.T) { + want := uint32(1) + + addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer(t) }) + defer teardown() + + mgr := NewManager( + gorums.WithGrpcDialOptions( + grpc.WithTransportCredentials(insecure.NewCredentials()), + ), + ) + _, err := mgr.NewConfiguration(gorums.WithNodeList(addrs)) + if err != nil { + t.Fatal(err) + } + + node := mgr.Nodes()[0] + + md := metadata.New(map[string]string{ + "id": fmt.Sprint(want), + }) + ctx := metadata.NewOutgoingContext(context.Background(), md) + resp, err := node.IDFromMD(ctx, &emptypb.Empty{}) + if err != nil { + t.Fatalf("RPC error: %v", err) + } + + if resp.GetID() != want { + t.Fatalf("IDFromMD() == %d, want %d", resp.GetID(), want) + } +} + func TestPerNodeMetadata(t *testing.T) { addrs, teardown := gorums.TestSetup(t, 2, func(_ int) gorums.ServerIface { return initServer(t) }) defer teardown() diff --git a/unicast.go b/unicast.go index 53d6b1ef..5c91c3d0 100644 --- a/unicast.go +++ b/unicast.go @@ -13,7 +13,7 @@ import ( func (n *RawNode) Unicast(ctx context.Context, d CallData, opts ...CallOption) { o := getCallOptions(E_Unicast, opts) - md := ordering.Metadata_builder{MessageID: n.mgr.getMsgID(), Method: d.Method}.Build() + md := ordering.NewGorumsMetadata(ctx, n.mgr.getMsgID(), d.Method) req := request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}, opts: o} if o.noSendWaiting { From 9d710a7c3218b0f7e4e304e8b7d11c89451f030b Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 5 Apr 2025 00:36:21 +0200 Subject: [PATCH 2/4] fix: NewServer documentation Fixes #172 --- server.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server.go b/server.go index 8179cd09..d5601f97 100644 --- a/server.go +++ b/server.go @@ -142,9 +142,7 @@ type Server struct { grpcServer *grpc.Server } -// NewServer returns a new instance of GorumsServer. -// This function is intended for internal Gorums use. -// You should call `NewServer` in the generated code instead. +// NewServer returns a new instance of [gorums.Server]. func NewServer(opts ...ServerOption) *Server { var serverOpts serverOptions for _, opt := range opts { From 331e50c93df39982f85f6ce4adf5075748fc524c Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 5 Apr 2025 00:44:00 +0200 Subject: [PATCH 3/4] docs: mark methods for use by generated code This just adds a comment to the doc for a few methods that should only be called from generated code. --- multicast.go | 2 ++ ordering/gorums_metadata.go | 2 ++ unicast.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/multicast.go b/multicast.go index ba70b1ac..1ef603a1 100644 --- a/multicast.go +++ b/multicast.go @@ -10,6 +10,8 @@ import ( // By default this function returns once the message has been sent to all nodes. // Providing the call option WithNoSendWaiting, the function may return // before the message has been sent. +// +// This method should be used by generated code only. func (c RawConfiguration) Multicast(ctx context.Context, d QuorumCallData, opts ...CallOption) { o := getCallOptions(E_Multicast, opts) md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method) diff --git a/ordering/gorums_metadata.go b/ordering/gorums_metadata.go index b0640e91..cde158e9 100644 --- a/ordering/gorums_metadata.go +++ b/ordering/gorums_metadata.go @@ -11,6 +11,7 @@ import ( // context to the Gorums metadata object. // // This is used to pass client-specific metadata to the server via Gorums. +// This method should be used by generated code only. func NewGorumsMetadata(ctx context.Context, msgID uint64, method string) *Metadata { gorumsMetadata := Metadata_builder{MessageID: msgID, Method: method} md, _ := metadata.FromOutgoingContext(ctx) @@ -28,6 +29,7 @@ func NewGorumsMetadata(ctx context.Context, msgID uint64, method string) *Metada // // This is used to pass client-specific metadata from the Gorums runtime // to the server implementation. +// This method should be used by generated code only. func (x *Metadata) AppendToIncomingContext(ctx context.Context) context.Context { existingMD, _ := metadata.FromIncomingContext(ctx) newMD := existingMD.Copy() // copy to avoid mutating the original diff --git a/unicast.go b/unicast.go index 5c91c3d0..6b12235f 100644 --- a/unicast.go +++ b/unicast.go @@ -10,6 +10,8 @@ import ( // By default this function returns once the message has been sent. // Providing the call option WithNoSendWaiting, the function may return // before the message has been sent. +// +// This method should be used by generated code only. func (n *RawNode) Unicast(ctx context.Context, d CallData, opts ...CallOption) { o := getCallOptions(E_Unicast, opts) From 94f994945ec924f5fdb567279d8d3b6d211bd88a Mon Sep 17 00:00:00 2001 From: Hein Meling Date: Sat, 5 Apr 2025 00:58:55 +0200 Subject: [PATCH 4/4] chore: remove unnecessary parameter from initServer function --- tests/metadata/metadata_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/metadata/metadata_test.go b/tests/metadata/metadata_test.go index 64121edf..a92cb759 100644 --- a/tests/metadata/metadata_test.go +++ b/tests/metadata/metadata_test.go @@ -42,7 +42,7 @@ func (srv testSrv) WhatIP(ctx gorums.ServerCtx, _ *emptypb.Empty) (resp *IPAddr, return &IPAddr{Addr: peerInfo.Addr.String()}, nil } -func initServer(t *testing.T) *gorums.Server { +func initServer() *gorums.Server { srv := gorums.NewServer() RegisterMetadataTestServer(srv, &testSrv{}) return srv @@ -51,7 +51,7 @@ func initServer(t *testing.T) *gorums.Server { func TestMetadata(t *testing.T) { want := uint32(1) - addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer(t) }) + addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer() }) defer teardown() md := metadata.New(map[string]string{ @@ -83,7 +83,7 @@ func TestMetadata(t *testing.T) { func TestPerMessageMetadata(t *testing.T) { want := uint32(1) - addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer(t) }) + addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer() }) defer teardown() mgr := NewManager( @@ -113,7 +113,7 @@ func TestPerMessageMetadata(t *testing.T) { } func TestPerNodeMetadata(t *testing.T) { - addrs, teardown := gorums.TestSetup(t, 2, func(_ int) gorums.ServerIface { return initServer(t) }) + addrs, teardown := gorums.TestSetup(t, 2, func(_ int) gorums.ServerIface { return initServer() }) defer teardown() perNodeMD := func(nid uint32) metadata.MD { @@ -146,7 +146,7 @@ func TestPerNodeMetadata(t *testing.T) { } func TestCanGetPeerInfo(t *testing.T) { - addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer(t) }) + addrs, teardown := gorums.TestSetup(t, 1, func(_ int) gorums.ServerIface { return initServer() }) defer teardown() mgr := NewManager(