Skip to content
This repository was archived by the owner on Mar 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cloudstate/crdt/flag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ func TestFlag(t *testing.T) {
})
t.Run("should return its state", func(t *testing.T) {
f := NewFlag()
// if encDecState(f.State()).GetFlag().GetValue() {
// t.Fatal("value should be false but was not")
// }
f.resetDelta()
f.Enable()
if !f.Value() {
Expand Down
1 change: 0 additions & 1 deletion cloudstate/crdt/lwwregister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func TestLWWRegister(t *testing.T) {
t.Fatal(err)
}
r := NewLWWRegister(foo)
// r.Set(encoding.Struct(Example{Field1: "foo"})) // TODO: this is not the same, check
bar, err := encoding.Struct(Example{Field1: "bar"})
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 0 additions & 6 deletions cloudstate/crdt/ormap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ func TestORMap(t *testing.T) {
if !contains(delta.GetOrmap().GetRemoved(), "one", "two") {
t.Fatalf("delta.removed should contain keys 'one','two' but did not: %v", delta.GetOrmap().GetRemoved())
}
// if d := m.Delta(); d != nil {
// t.Fatalf("m.Delta(): %v; want: %v", d, nil)
// }
})
t.Run("should generate an update delta", func(t *testing.T) {
m := NewORMap()
Expand All @@ -149,9 +146,6 @@ func TestORMap(t *testing.T) {
if i := entry.GetDelta().GetGcounter().GetIncrement(); i != 5 {
t.Fatalf("increment: %v; want: %v", i, 5)
}
// if d := m.Delta(); d != nil {
// t.Fatalf("m.Delta(): %v; want: %v", d, nil)
// }
})
t.Run("should generate a clear delta", func(t *testing.T) {
m := NewORMap()
Expand Down
1 change: 0 additions & 1 deletion cloudstate/crdt/orset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func TestORSet(t *testing.T) {
if s.Size() != 1 {
t.Fatalf("s.Size(): %v; want: %v", s.Size(), 1)
}
// delta := encDecDelta(s.Delta())
s.resetDelta()
if s.HasDelta() {
t.Fatalf("set has delta")
Expand Down
1 change: 0 additions & 1 deletion cloudstate/crdt/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type CRDT interface {
Delta() *entity.CrdtDelta
HasDelta() bool

// applyState(*entity.CrdtState) error
applyDelta(*entity.CrdtDelta) error
resetDelta()
}
23 changes: 0 additions & 23 deletions cloudstate/crdt/vote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,6 @@ func TestVote(t *testing.T) {
t.Fatalf("v.SelfVote(): %v; want: %v", v.SelfVote(), false)
}
})
// t.Run("should reflect a state update", func(t *testing.T) {
// v := NewVote()
// err := v.applyState(encDecState(&entity.CrdtState{
// State: &entity.CrdtState_Vote{
// Vote: &entity.VoteState{
// TotalVoters: 5,
// VotesFor: 3,
// SelfVote: true,
// }},
// }))
// if err != nil {
// t.Fatal(err)
// }
// if v.VotesFor() != 3 {
// t.Fatalf("v.VotesFor(): %v; want: %v", v.VotesFor(), 3)
// }
// if v.Voters() != 5 {
// t.Fatalf("v.Voters(): %v; want: %v", v.Voters(), 5)
// }
// if v.SelfVote() != true {
// t.Fatalf("v.SelfVote(): %v; want: %v", v.SelfVote(), true)
// }
// })
t.Run("should reflect a delta update", func(t *testing.T) {
v := NewVote()
if err := v.applyDelta(encDecDelta(&entity.CrdtDelta{
Expand Down
19 changes: 11 additions & 8 deletions protobuf/tck/tck_crdt2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ option go_package = "github.com/cloudstateio/go-support/tck/crdt2;crdt2";
// - The `ProcessStreamed` method must stream the current state in a `Response`, on any changes.
// - A `StreamedRequest` message may have an end state, an update to apply on stream cancellation, or side effects.
service CrdtTckModel {
rpc Process (Request) returns (Response);
rpc ProcessStreamed (StreamedRequest) returns (stream Response);
rpc Process(Request) returns (Response);
rpc ProcessStreamed(StreamedRequest) returns (stream Response);
}

//
// The `CrdtTwo` service is only for verifying forwards and side effects.
// The `Call` method is not required to do anything, and may simply return an empty `Response` message.
//
service CrdtTwo {
rpc Call (Request) returns (Response);
rpc Call(Request) returns (Response);
}

//
Expand All @@ -68,12 +68,17 @@ message Request {
// If `end_state` is set, it specifies a target state for ending the stream.
// If `cancel_update` is set, it specifies an update to apply when the stream is cancelled.
// If `effects` is set, it specifies side effects to return with every streamed response.
// If `initial_update` is set, it specifies an update to apply on the initial request.
// If `empty` is set, then no responses should be streamed (for testing empty stream connections).
// Otherwise, the current state should be streamed on changes.
//
message StreamedRequest {
string id = 1 [(.cloudstate.entity_key) = true];
State end_state = 2;
Update cancel_update = 3;
repeated Effect effects = 4;
Update initial_update = 5;
bool empty = 6;
}

//
Expand Down Expand Up @@ -174,8 +179,7 @@ enum LWWRegisterClockType {
//
// Update a Flag CRDT by enabling it.
//
message FlagUpdate {
}
message FlagUpdate {}

//
// Update an ORMap CRDT by adding, updating, or removing entries, or clearing the map.
Expand Down Expand Up @@ -217,8 +221,7 @@ enum UpdateWriteConsistency {
//
// Delete the CRDT.
//
message Delete {
}
message Delete {}

//
// Replace the response with a forward to `cloudstate.tck.model.CrdtTwo/Call`.
Expand Down Expand Up @@ -336,4 +339,4 @@ message VoteValue {
bool self_vote = 1;
int32 votes_for = 2;
int32 total_voters = 3;
}
}
41 changes: 29 additions & 12 deletions tck/crdt2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ func NewCrdtTwoEntity(id crdt.EntityID) crdt.EntityHandler {
return &CrdtTwoEntity{}
}

func (e *CrdtTckModelEntity) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
func (e *CrdtTckModelEntity) processStreamed(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
r, ok := msg.(*StreamedRequest)
if !ok {
return nil, nil
}
fmt.Printf("processStreamed: %+v\n", ctx.EntityID)
if ctx.Streamed() {
r, ok := msg.(*StreamedRequest)
if !ok {
return nil, nil
}
ctx.ChangeFunc(func(c *crdt.CommandContext) (*any.Any, error) {
for _, effect := range r.GetEffects() {
req, err := encoding.MarshalAny(&Request{
Expand All @@ -79,6 +80,9 @@ func (e *CrdtTckModelEntity) HandleCommand(ctx *crdt.CommandContext, name string
ctx.EndStream()
}
}
if r.GetEmpty() {
return nil, nil
}
state, err := crdtState(c.CRDT())
if err != nil {
return nil, err
Expand All @@ -87,20 +91,33 @@ func (e *CrdtTckModelEntity) HandleCommand(ctx *crdt.CommandContext, name string
State: state,
})
})
if r.GetCancelUpdate() != nil {
if u := r.GetCancelUpdate(); u != nil {
ctx.CancelFunc(func(c *crdt.CommandContext) error {
return applyUpdate(c.CRDT(), r.GetCancelUpdate())
return applyUpdate(c.CRDT(), u)
})
}
state, err := crdtState(ctx.CRDT())
if err != nil {
}
if u := r.GetInitialUpdate(); u != nil {
if err := applyUpdate(ctx.CRDT(), u); err != nil {
return nil, err
}
return encoding.MarshalAny(&Response{
State: state,
})
}
if r.GetEmpty() {
return nil, nil
}
state, err := crdtState(ctx.CRDT())
if err != nil {
return nil, err
}
return encoding.MarshalAny(&Response{
State: state,
})
}

func (e *CrdtTckModelEntity) HandleCommand(ctx *crdt.CommandContext, name string, msg proto.Message) (*any.Any, error) {
if name == "ProcessStreamed" {
return e.processStreamed(ctx, name, msg)
}
r, ok := msg.(*Request)
if !ok {
return nil, nil
Expand Down
120 changes: 73 additions & 47 deletions tck/crdt2/tck_crdt2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.