Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type asyncCallState struct {
// This function should only be used by generated code.
func (c RawConfiguration) AsyncCall(ctx context.Context, d QuorumCallData) *Async {
expectedReplies := len(c)
md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build()
md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method)
replyChan := make(chan response, expectedReplies)

for _, n := range c {
Expand Down
24 changes: 14 additions & 10 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ func TestChannelCreation(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//the node should be closed manually because it isn't added to the configuration
// the node should be closed manually because it isn't added to the configuration
defer node.close()
mgr := dummyMgr()
// a proper connection should NOT be established here
node.connect(mgr)

replyChan := make(chan response, 1)
go func() {
md := ordering.Metadata_builder{MessageID: 1, Method: handlerName}.Build()
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
ctx := context.Background()
md := ordering.NewGorumsMetadata(ctx, 1, handlerName)
req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan, false)
}()
select {
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestChannelReconnection(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//the node should be closed manually because it isn't added to the configuration
// the node should be closed manually because it isn't added to the configuration
defer node.close()
mgr := dummyMgr()
// a proper connection should NOT be established here because server is not started
Expand All @@ -127,8 +128,9 @@ func TestChannelReconnection(t *testing.T) {
// send first message when server is down
replyChan1 := make(chan response, 1)
go func() {
md := ordering.Metadata_builder{MessageID: 1, Method: handlerName}.Build()
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
ctx := context.Background()
md := ordering.NewGorumsMetadata(ctx, 1, handlerName)
req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan1, false)
}()

Expand All @@ -147,8 +149,9 @@ func TestChannelReconnection(t *testing.T) {
// send second message when server is up
replyChan2 := make(chan response, 1)
go func() {
md := ordering.Metadata_builder{MessageID: 2, Method: handlerName}.Build()
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)}
ctx := context.Background()
md := ordering.NewGorumsMetadata(ctx, 2, handlerName)
req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}, opts: getCallOptions(E_Multicast, nil)}
node.channel.enqueue(req, replyChan2, false)
}()

Expand All @@ -167,8 +170,9 @@ func TestChannelReconnection(t *testing.T) {
// send third message when server has been previously up but is now down
replyChan3 := make(chan response, 1)
go func() {
md := ordering.Metadata_builder{MessageID: 3, Method: handlerName}.Build()
req := request{ctx: context.Background(), msg: &Message{Metadata: md, Message: &mock.Request{}}}
ctx := context.Background()
md := ordering.NewGorumsMetadata(ctx, 3, handlerName)
req := request{ctx: ctx, msg: &Message{Metadata: md, Message: &mock.Request{}}}
node.channel.enqueue(req, replyChan3, false)
}()

Expand Down
210 changes: 57 additions & 153 deletions cmd/protoc-gen-gorums/dev/zorums.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion correctable.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type correctableCallState struct {
// This method should only be used by generated code.
func (c RawConfiguration) CorrectableCall(ctx context.Context, d CorrectableCallData) *Correctable {
expectedReplies := len(c)
md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build()
md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method)

replyChan := make(chan response, expectedReplies)
for _, n := range c {
Expand Down
4 changes: 3 additions & 1 deletion multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
// By default this function returns once the message has been sent to all nodes.
// Providing the call option WithNoSendWaiting, the function may return
// before the message has been sent.
//
// This method should be used by generated code only.
func (c RawConfiguration) Multicast(ctx context.Context, d QuorumCallData, opts ...CallOption) {
o := getCallOptions(E_Multicast, opts)
md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build()
md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method)
sentMsgs := 0

var replyChan chan response
Expand Down
40 changes: 40 additions & 0 deletions ordering/gorums_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ordering

import (
"context"

"google.golang.org/grpc/metadata"
)

// NewGorumsMetadata creates a new Gorums metadata object for the given method
// and client message ID. It also appends any client-specific metadata from the
// context to the Gorums metadata object.
//
// This is used to pass client-specific metadata to the server via Gorums.
// This method should be used by generated code only.
func NewGorumsMetadata(ctx context.Context, msgID uint64, method string) *Metadata {
gorumsMetadata := Metadata_builder{MessageID: msgID, Method: method}
md, _ := metadata.FromOutgoingContext(ctx)
for k, vv := range md {
for _, v := range vv {
entry := MetadataEntry_builder{Key: k, Value: v}.Build()
gorumsMetadata.Entry = append(gorumsMetadata.Entry, entry)
}
}
return gorumsMetadata.Build()
}

// AppendToIncomingContext appends client-specific metadata from the
// Gorums metadata object to the incoming context.
//
// This is used to pass client-specific metadata from the Gorums runtime
// to the server implementation.
// This method should be used by generated code only.
func (x *Metadata) AppendToIncomingContext(ctx context.Context) context.Context {
existingMD, _ := metadata.FromIncomingContext(ctx)
newMD := existingMD.Copy() // copy to avoid mutating the original
for _, entry := range x.GetEntry() {
newMD.Append(entry.GetKey(), entry.GetValue())
}
return metadata.NewIncomingContext(ctx, newMD)
}
151 changes: 117 additions & 34 deletions ordering/ordering.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions ordering/ordering.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,11 @@ message Metadata {
uint64 MessageID = 1;
string Method = 2;
google.rpc.Status Status = 3;
// per message client-generated metadata
repeated MetadataEntry Entry = 4;
}

message MetadataEntry {
string Key = 1;
string Value = 2;
}
2 changes: 1 addition & 1 deletion quorumcall.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type QuorumCallData struct {
// This method should be used by generated code only.
func (c RawConfiguration) QuorumCall(ctx context.Context, d QuorumCallData) (resp protoreflect.ProtoMessage, err error) {
expectedReplies := len(c)
md := ordering.Metadata_builder{MessageID: c.getMsgID(), Method: d.Method}.Build()
md := ordering.NewGorumsMetadata(ctx, c.getMsgID(), d.Method)

replyChan := make(chan response, expectedReplies)
for _, n := range c {
Expand Down
2 changes: 1 addition & 1 deletion rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type CallData struct {
//
// This method should be used by generated code only.
func (n *RawNode) RPCCall(ctx context.Context, d CallData) (protoreflect.ProtoMessage, error) {
md := ordering.Metadata_builder{MessageID: n.mgr.getMsgID(), Method: d.Method}.Build()
md := ordering.NewGorumsMetadata(ctx, n.mgr.getMsgID(), d.Method)
replyChan := make(chan response, 1)
n.channel.enqueue(request{ctx: ctx, msg: &Message{Metadata: md, Message: d.Message}}, replyChan, false)

Expand Down
6 changes: 2 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *orderingServer) NodeStream(srv ordering.Gorums_NodeStreamServer) error
// We start the handler in a new goroutine in order to allow multiple handlers to run concurrently.
// However, to preserve request ordering, the handler must unlock the shared mutex when it has either
// finished, or when it is safe to start processing the next request.
go handler(ServerCtx{Context: ctx, once: new(sync.Once), mut: &mut}, req, finished)
go handler(ServerCtx{Context: req.Metadata.AppendToIncomingContext(ctx), once: new(sync.Once), mut: &mut}, req, finished)
// Wait until the handler releases the mutex.
mut.Lock()
}
Expand Down Expand Up @@ -142,9 +142,7 @@ type Server struct {
grpcServer *grpc.Server
}

// NewServer returns a new instance of GorumsServer.
// This function is intended for internal Gorums use.
// You should call `NewServer` in the generated code instead.
// NewServer returns a new instance of [gorums.Server].
func NewServer(opts ...ServerOption) *Server {
var serverOpts serverOptions
for _, opt := range opts {
Expand Down
Loading