diff --git a/foreign/go/binary_serialization/binary_request_serializer.go b/foreign/go/binary_serialization/binary_request_serializer.go deleted file mode 100644 index 412aae18ca..0000000000 --- a/foreign/go/binary_serialization/binary_request_serializer.go +++ /dev/null @@ -1,410 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package binaryserialization - -import ( - "encoding/binary" - - iggcon "github.com/apache/iggy/foreign/go/contracts" -) - -func CreateGroup(request iggcon.CreateConsumerGroupRequest) []byte { - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - offset := len(streamIdBytes) + len(topicIdBytes) - bytes := make([]byte, offset+1+len(request.Name)) - copy(bytes[0:len(streamIdBytes)], streamIdBytes) - copy(bytes[len(streamIdBytes):offset], topicIdBytes) - bytes[offset] = byte(len(request.Name)) - copy(bytes[offset+1:], request.Name) - return bytes -} - -func UpdateOffset(request iggcon.StoreConsumerOffsetRequest) []byte { - hasPartition := byte(0) - var partition uint32 = 0 - if request.PartitionId != nil { - hasPartition = 1 - partition = *request.PartitionId - } - consumerBytes := SerializeConsumer(request.Consumer) - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - // consumer + stream_id + topic_id + hasPartition(1) + partition(4) + offset(8) - bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13) - position := 0 - copy(bytes[position:], consumerBytes) - position += len(consumerBytes) - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - bytes[position] = hasPartition - binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) - binary.LittleEndian.PutUint64(bytes[position+5:position+13], uint64(request.Offset)) - return bytes -} - -func GetOffset(request iggcon.GetConsumerOffsetRequest) []byte { - hasPartition := byte(0) - var partition uint32 = 0 - if request.PartitionId != nil { - hasPartition = 1 - partition = *request.PartitionId - } - consumerBytes := SerializeConsumer(request.Consumer) - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - // consumer + stream_id + topic_id + hasPartition(1) + partition(4) - bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5) - position := 0 - copy(bytes[position:], consumerBytes) - position += len(consumerBytes) - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - bytes[position] = hasPartition - binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) - return bytes -} - -func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte { - hasPartition := byte(0) - var partition uint32 = 0 - if request.PartitionId != nil { - hasPartition = 1 - partition = *request.PartitionId - } - consumerBytes := SerializeConsumer(request.Consumer) - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - // consumer + stream_id + topic_id + hasPartition(1) + partition(4) - bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5) - position := 0 - copy(bytes[position:], consumerBytes) - position += len(consumerBytes) - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - bytes[position] = hasPartition - binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) - return bytes -} - -func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte { - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4) - position := 0 - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(request.PartitionsCount)) - - return bytes -} - -func DeletePartitions(request iggcon.DeletePartitionsRequest) []byte { - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4) - position := 0 - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(request.PartitionsCount)) - - return bytes -} - -//USERS - -func SerializeCreateUserRequest(request iggcon.CreateUserRequest) []byte { - capacity := 4 + len(request.Username) + len(request.Password) - if request.Permissions != nil { - capacity += 1 + 4 + CalculatePermissionsSize(request.Permissions) - } - - bytes := make([]byte, capacity) - position := 0 - - bytes[position] = byte(len(request.Username)) - position += 1 - copy(bytes[position:position+len(request.Username)], []byte(request.Username)) - position += len(request.Username) - - bytes[position] = byte(len(request.Password)) - position += 1 - copy(bytes[position:position+len(request.Password)], []byte(request.Password)) - position += len(request.Password) - - statusByte := byte(0) - switch request.Status { - case iggcon.Active: - statusByte = byte(1) - case iggcon.Inactive: - statusByte = byte(2) - } - bytes[position] = statusByte - position += 1 - - if request.Permissions != nil { - bytes[position] = byte(1) - position += 1 - permissionsBytes := GetBytesFromPermissions(request.Permissions) - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes))) - position += 4 - copy(bytes[position:position+len(permissionsBytes)], permissionsBytes) - } else { - bytes[position] = byte(0) - } - - return bytes -} - -func GetBytesFromPermissions(data *iggcon.Permissions) []byte { - size := CalculatePermissionsSize(data) - bytes := make([]byte, size) - - bytes[0] = boolToByte(data.Global.ManageServers) - bytes[1] = boolToByte(data.Global.ReadServers) - bytes[2] = boolToByte(data.Global.ManageUsers) - bytes[3] = boolToByte(data.Global.ReadUsers) - bytes[4] = boolToByte(data.Global.ManageStreams) - bytes[5] = boolToByte(data.Global.ReadStreams) - bytes[6] = boolToByte(data.Global.ManageTopics) - bytes[7] = boolToByte(data.Global.ReadTopics) - bytes[8] = boolToByte(data.Global.PollMessages) - bytes[9] = boolToByte(data.Global.SendMessages) - - position := 10 - - if data.Streams != nil { - bytes[position] = byte(1) - position += 1 - - for streamID, stream := range data.Streams { - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(streamID)) - position += 4 - - bytes[position] = boolToByte(stream.ManageStream) - bytes[position+1] = boolToByte(stream.ReadStream) - bytes[position+2] = boolToByte(stream.ManageTopics) - bytes[position+3] = boolToByte(stream.ReadTopics) - bytes[position+4] = boolToByte(stream.PollMessages) - bytes[position+5] = boolToByte(stream.SendMessages) - position += 6 - - if stream.Topics != nil { - bytes[position] = byte(1) - position += 1 - - for topicID, topic := range stream.Topics { - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(topicID)) - position += 4 - - bytes[position] = boolToByte(topic.ManageTopic) - bytes[position+1] = boolToByte(topic.ReadTopic) - bytes[position+2] = boolToByte(topic.PollMessages) - bytes[position+3] = boolToByte(topic.SendMessages) - position += 4 - - bytes[position] = byte(0) - position += 1 - } - } else { - bytes[position] = byte(0) - position += 1 - } - } - } else { - bytes[0] = byte(0) - } - - return bytes -} - -func CalculatePermissionsSize(data *iggcon.Permissions) int { - size := 10 - - if data.Streams != nil { - size += 1 - - for _, stream := range data.Streams { - size += 4 - size += 6 - size += 1 - - if stream.Topics != nil { - size += 1 - size += len(stream.Topics) * 9 - } else { - size += 1 - } - } - } else { - size += 1 - } - - return size -} - -func boolToByte(b bool) byte { - if b { - return 1 - } - return 0 -} - -func SerializeUpdateUser(request iggcon.UpdateUserRequest) []byte { - userIdBytes := SerializeIdentifier(request.UserID) - length := len(userIdBytes) - - if request.Username == nil { - request.Username = new(string) - } - - username := *request.Username - - if len(username) != 0 { - length += 2 + len(username) - } - - if request.Status != nil { - length += 2 - } - - bytes := make([]byte, length+1) - position := 0 - - copy(bytes[position:position+len(userIdBytes)], userIdBytes) - position += len(userIdBytes) - - if len(username) != 0 { - bytes[position] = 1 - position++ - bytes[position] = byte(len(username)) - position++ - copy(bytes[position:position+len(username)], username) - position += len(username) - } else { - bytes[position] = 0 - position++ - } - - if request.Status != nil { - bytes[position] = 1 - position++ - statusByte := byte(0) - switch *request.Status { - case iggcon.Active: - statusByte = 1 - case iggcon.Inactive: - statusByte = 2 - } - bytes[position] = statusByte - } else { - bytes[position] = 0 - } - - return bytes -} - -func SerializeChangePasswordRequest(request iggcon.ChangePasswordRequest) []byte { - userIdBytes := SerializeIdentifier(request.UserID) - length := len(userIdBytes) + len(request.CurrentPassword) + len(request.NewPassword) + 2 - bytes := make([]byte, length) - position := 0 - - copy(bytes[position:position+len(userIdBytes)], userIdBytes) - position += len(userIdBytes) - - bytes[position] = byte(len(request.CurrentPassword)) - position++ - copy(bytes[position:position+len(request.CurrentPassword)], []byte(request.CurrentPassword)) - position += len(request.CurrentPassword) - - bytes[position] = byte(len(request.NewPassword)) - position++ - copy(bytes[position:position+len(request.NewPassword)], []byte(request.NewPassword)) - - return bytes -} - -func SerializeUpdateUserPermissionsRequest(request iggcon.UpdatePermissionsRequest) []byte { - userIdBytes := SerializeIdentifier(request.UserID) - length := len(userIdBytes) - - if request.Permissions != nil { - length += 1 + 4 + CalculatePermissionsSize(request.Permissions) - } - - bytes := make([]byte, length) - position := 0 - - copy(bytes[position:position+len(userIdBytes)], userIdBytes) - position += len(userIdBytes) - - if request.Permissions != nil { - bytes[position] = 1 - position++ - permissionsBytes := GetBytesFromPermissions(request.Permissions) - binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes))) - position += 4 - copy(bytes[position:position+len(permissionsBytes)], permissionsBytes) - } else { - bytes[position] = 0 - } - - return bytes -} - -func SerializeUint32(value uint32) []byte { - bytes := make([]byte, 4) - binary.LittleEndian.PutUint32(bytes, value) - return bytes -} - -func SerializeLoginWithPersonalAccessToken(request iggcon.LoginWithPersonalAccessTokenRequest) []byte { - length := 1 + len(request.Token) - bytes := make([]byte, length) - bytes[0] = byte(len(request.Token)) - copy(bytes[1:], []byte(request.Token)) - return bytes -} - -func SerializeDeletePersonalAccessToken(request iggcon.DeletePersonalAccessTokenRequest) []byte { - length := 1 + len(request.Name) - bytes := make([]byte, length) - bytes[0] = byte(len(request.Name)) - copy(bytes[1:], []byte(request.Name)) - return bytes -} - -func SerializeCreatePersonalAccessToken(request iggcon.CreatePersonalAccessTokenRequest) []byte { - length := 1 + len(request.Name) + 8 - bytes := make([]byte, length) - bytes[0] = byte(len(request.Name)) - copy(bytes[1:], []byte(request.Name)) - binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], request.Expiry) - return bytes -} diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go b/foreign/go/binary_serialization/binary_response_deserializer.go index 033de643b8..81bd35153f 100644 --- a/foreign/go/binary_serialization/binary_response_deserializer.go +++ b/foreign/go/binary_serialization/binary_response_deserializer.go @@ -545,12 +545,12 @@ func DeserializeClient(payload []byte) *iggcon.ClientInfoDetails { for i := uint32(0); i < clientInfo.ConsumerGroupsCount; i++ { streamId := binary.LittleEndian.Uint32(payload[position : position+4]) topicId := binary.LittleEndian.Uint32(payload[position+4 : position+8]) - consumerGroupId := binary.LittleEndian.Uint32(payload[position+8 : position+12]) + groupId := binary.LittleEndian.Uint32(payload[position+8 : position+12]) consumerGroup := iggcon.ConsumerGroupInfo{ - StreamId: streamId, - TopicId: topicId, - ConsumerGroupId: consumerGroupId, + StreamId: streamId, + TopicId: topicId, + GroupId: groupId, } consumerGroups = append(consumerGroups, consumerGroup) position += 12 diff --git a/foreign/go/binary_serialization/identifier_serializer.go b/foreign/go/binary_serialization/identifier_serializer.go deleted file mode 100644 index 91b1efee26..0000000000 --- a/foreign/go/binary_serialization/identifier_serializer.go +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package binaryserialization - -import ( - iggcon "github.com/apache/iggy/foreign/go/contracts" -) - -func SerializeIdentifier(identifier iggcon.Identifier) []byte { - bytes := make([]byte, identifier.Length+2) - bytes[0] = byte(identifier.Kind) - bytes[1] = byte(identifier.Length) - copy(bytes[2:], identifier.Value) - return bytes -} - -func SerializeIdentifiers(identifiers ...iggcon.Identifier) []byte { - size := 0 - for i := 0; i < len(identifiers); i++ { - size += 2 + identifiers[i].Length - } - bytes := make([]byte, size) - position := 0 - - for i := 0; i < len(identifiers); i++ { - copy(bytes[position:position+2+identifiers[i].Length], SerializeIdentifier(identifiers[i])) - position += 2 + identifiers[i].Length - } - - return bytes -} - -func SerializePartitioning(partitioning iggcon.Partitioning) []byte { - bytes := make([]byte, 2+partitioning.Length) - bytes[0] = byte(partitioning.Kind) - bytes[1] = byte(partitioning.Length) - copy(bytes[2:], partitioning.Value) - return bytes -} diff --git a/foreign/go/binary_serialization/update_topic_serializer.go b/foreign/go/binary_serialization/update_topic_serializer.go deleted file mode 100644 index 2005cb080b..0000000000 --- a/foreign/go/binary_serialization/update_topic_serializer.go +++ /dev/null @@ -1,68 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package binaryserialization - -import ( - "encoding/binary" - - iggcon "github.com/apache/iggy/foreign/go/contracts" -) - -type TcpUpdateTopicRequest struct { - StreamId iggcon.Identifier `json:"streamId"` - TopicId iggcon.Identifier `json:"topicId"` - CompressionAlgorithm iggcon.CompressionAlgorithm `json:"compressionAlgorithm"` - MessageExpiry iggcon.Duration `json:"messageExpiry"` - MaxTopicSize uint64 `json:"maxTopicSize"` - ReplicationFactor *uint8 `json:"replicationFactor"` - Name string `json:"name"` -} - -func (request *TcpUpdateTopicRequest) Serialize() []byte { - if request.ReplicationFactor == nil { - request.ReplicationFactor = new(uint8) - } - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - - buffer := make([]byte, 19+len(streamIdBytes)+len(topicIdBytes)+len(request.Name)) - - offset := 0 - - offset += copy(buffer[offset:], streamIdBytes) - offset += copy(buffer[offset:], topicIdBytes) - - buffer[offset] = byte(request.CompressionAlgorithm) - offset++ - - binary.LittleEndian.PutUint64(buffer[offset:], uint64(request.MessageExpiry)) - offset += 8 - - binary.LittleEndian.PutUint64(buffer[offset:], request.MaxTopicSize) - offset += 8 - - buffer[offset] = *request.ReplicationFactor - offset++ - - buffer[offset] = uint8(len(request.Name)) - offset++ - - copy(buffer[offset:], request.Name) - - return buffer -} diff --git a/foreign/go/client/tcp/tcp_access_token_management.go b/foreign/go/client/tcp/tcp_access_token_management.go index 84f67a4984..fdb487644c 100644 --- a/foreign/go/client/tcp/tcp_access_token_management.go +++ b/foreign/go/client/tcp/tcp_access_token_management.go @@ -23,11 +23,10 @@ import ( ) func (c *IggyTcpClient) CreatePersonalAccessToken(name string, expiry uint32) (*iggcon.RawPersonalAccessToken, error) { - message := binaryserialization.SerializeCreatePersonalAccessToken(iggcon.CreatePersonalAccessTokenRequest{ + buffer, err := c.do(&iggcon.CreatePersonalAccessToken{ Name: name, Expiry: expiry, }) - buffer, err := c.sendAndFetchResponse(message, iggcon.CreateAccessTokenCode) if err != nil { return nil, err } @@ -36,15 +35,14 @@ func (c *IggyTcpClient) CreatePersonalAccessToken(name string, expiry uint32) (* } func (c *IggyTcpClient) DeletePersonalAccessToken(name string) error { - message := binaryserialization.SerializeDeletePersonalAccessToken(iggcon.DeletePersonalAccessTokenRequest{ + _, err := c.do(&iggcon.DeletePersonalAccessToken{ Name: name, }) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteAccessTokenCode) return err } func (c *IggyTcpClient) GetPersonalAccessTokens() ([]iggcon.PersonalAccessTokenInfo, error) { - buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetAccessTokensCode) + buffer, err := c.do(&iggcon.GetPersonalAccessTokens{}) if err != nil { return nil, err } diff --git a/foreign/go/client/tcp/tcp_clients_management.go b/foreign/go/client/tcp/tcp_clients_management.go index ea0dddd8df..cb93d66e89 100644 --- a/foreign/go/client/tcp/tcp_clients_management.go +++ b/foreign/go/client/tcp/tcp_clients_management.go @@ -23,7 +23,7 @@ import ( ) func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo, error) { - buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetClientsCode) + buffer, err := c.do(&iggcon.GetClients{}) if err != nil { return nil, err } @@ -32,8 +32,7 @@ func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo, error) { } func (c *IggyTcpClient) GetClient(clientId uint32) (*iggcon.ClientInfoDetails, error) { - message := binaryserialization.SerializeUint32(clientId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetClientCode) + buffer, err := c.do(&iggcon.GetClient{ClientID: clientId}) if err != nil { return nil, err } diff --git a/foreign/go/client/tcp/tcp_consumer_group_management.go b/foreign/go/client/tcp/tcp_consumer_group_management.go index 83a85a6671..deec1fe910 100644 --- a/foreign/go/client/tcp/tcp_consumer_group_management.go +++ b/foreign/go/client/tcp/tcp_consumer_group_management.go @@ -24,8 +24,10 @@ import ( ) func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId iggcon.Identifier) ([]iggcon.ConsumerGroup, error) { - message := binaryserialization.SerializeIdentifiers(streamId, topicId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetGroupsCode) + buffer, err := c.do(&iggcon.GetConsumerGroups{ + StreamId: streamId, + TopicId: topicId, + }) if err != nil { return nil, err } @@ -34,8 +36,13 @@ func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId iggcon.Identifier) ( } func (c *IggyTcpClient) GetConsumerGroup(streamId, topicId, groupId iggcon.Identifier) (*iggcon.ConsumerGroupDetails, error) { - message := binaryserialization.SerializeIdentifiers(streamId, topicId, groupId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetGroupCode) + buffer, err := c.do(&iggcon.GetConsumerGroup{ + TopicPath: iggcon.TopicPath{ + StreamId: streamId, + TopicId: topicId, + }, + GroupId: groupId, + }) if err != nil { return nil, err } @@ -51,12 +58,13 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId iggcon.Identifier, topicId if MaxStringLength < len(name) || len(name) == 0 { return nil, ierror.ErrInvalidConsumerGroupName } - message := binaryserialization.CreateGroup(iggcon.CreateConsumerGroupRequest{ - StreamId: streamId, - TopicId: topicId, - Name: name, + buffer, err := c.do(&iggcon.CreateConsumerGroup{ + TopicPath: iggcon.TopicPath{ + StreamId: streamId, + TopicId: topicId, + }, + Name: name, }) - buffer, err := c.sendAndFetchResponse(message, iggcon.CreateGroupCode) if err != nil { return nil, err } @@ -65,19 +73,34 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId iggcon.Identifier, topicId } func (c *IggyTcpClient) DeleteConsumerGroup(streamId iggcon.Identifier, topicId iggcon.Identifier, groupId iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifiers(streamId, topicId, groupId) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteGroupCode) + _, err := c.do(&iggcon.DeleteConsumerGroup{ + TopicPath: iggcon.TopicPath{ + StreamId: streamId, + TopicId: topicId, + }, + GroupId: groupId, + }) return err } func (c *IggyTcpClient) JoinConsumerGroup(streamId iggcon.Identifier, topicId iggcon.Identifier, groupId iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifiers(streamId, topicId, groupId) - _, err := c.sendAndFetchResponse(message, iggcon.JoinGroupCode) + _, err := c.do(&iggcon.JoinConsumerGroup{ + TopicPath: iggcon.TopicPath{ + StreamId: streamId, + TopicId: topicId, + }, + GroupId: groupId, + }) return err } func (c *IggyTcpClient) LeaveConsumerGroup(streamId iggcon.Identifier, topicId iggcon.Identifier, groupId iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifiers(streamId, topicId, groupId) - _, err := c.sendAndFetchResponse(message, iggcon.LeaveGroupCode) + _, err := c.do(&iggcon.LeaveConsumerGroup{ + TopicPath: iggcon.TopicPath{ + StreamId: streamId, + TopicId: topicId, + }, + GroupId: groupId, + }) return err } diff --git a/foreign/go/client/tcp/tcp_messaging.go b/foreign/go/client/tcp/tcp_messaging.go index 69a3010192..22a5d008ba 100644 --- a/foreign/go/client/tcp/tcp_messaging.go +++ b/foreign/go/client/tcp/tcp_messaging.go @@ -36,13 +36,13 @@ func (c *IggyTcpClient) SendMessages( if len(messages) == 0 { return ierror.ErrInvalidMessagesCount } - serializedRequest := binaryserialization.TcpSendMessagesRequest{ + _, err := c.do(&iggcon.SendMessages{ + Compression: c.MessageCompression, StreamId: streamId, TopicId: topicId, Partitioning: partitioning, Messages: messages, - } - _, err := c.sendAndFetchResponse(serializedRequest.Serialize(c.MessageCompression), iggcon.SendMessagesCode) + }) return err } @@ -55,7 +55,7 @@ func (c *IggyTcpClient) PollMessages( autoCommit bool, partitionId *uint32, ) (*iggcon.PolledMessage, error) { - serializedRequest := binaryserialization.TcpFetchMessagesRequest{ + buffer, err := c.do(&iggcon.PollMessages{ StreamId: streamId, TopicId: topicId, Consumer: consumer, @@ -63,8 +63,7 @@ func (c *IggyTcpClient) PollMessages( Strategy: strategy, Count: count, PartitionId: partitionId, - } - buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.PollMessagesCode) + }) if err != nil { return nil, err } diff --git a/foreign/go/client/tcp/tcp_offset_management.go b/foreign/go/client/tcp/tcp_offset_management.go index 0c87bd1fc5..cec3c2c892 100644 --- a/foreign/go/client/tcp/tcp_offset_management.go +++ b/foreign/go/client/tcp/tcp_offset_management.go @@ -23,13 +23,12 @@ import ( ) func (c *IggyTcpClient) GetConsumerOffset(consumer iggcon.Consumer, streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32) (*iggcon.ConsumerOffsetInfo, error) { - message := binaryserialization.GetOffset(iggcon.GetConsumerOffsetRequest{ + buffer, err := c.do(&iggcon.GetConsumerOffset{ StreamId: streamId, TopicId: topicId, Consumer: consumer, PartitionId: partitionId, }) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetOffsetCode) if err != nil { return nil, err } @@ -38,24 +37,22 @@ func (c *IggyTcpClient) GetConsumerOffset(consumer iggcon.Consumer, streamId igg } func (c *IggyTcpClient) StoreConsumerOffset(consumer iggcon.Consumer, streamId iggcon.Identifier, topicId iggcon.Identifier, offset uint64, partitionId *uint32) error { - message := binaryserialization.UpdateOffset(iggcon.StoreConsumerOffsetRequest{ + _, err := c.do(&iggcon.StoreConsumerOffsetRequest{ StreamId: streamId, TopicId: topicId, Offset: offset, Consumer: consumer, PartitionId: partitionId, }) - _, err := c.sendAndFetchResponse(message, iggcon.StoreOffsetCode) return err } func (c *IggyTcpClient) DeleteConsumerOffset(consumer iggcon.Consumer, streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32) error { - message := binaryserialization.DeleteOffset(iggcon.DeleteConsumerOffsetRequest{ + _, err := c.do(&iggcon.DeleteConsumerOffset{ Consumer: consumer, StreamId: streamId, TopicId: topicId, PartitionId: partitionId, }) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteConsumerOffsetCode) return err } diff --git a/foreign/go/client/tcp/tcp_partition_management.go b/foreign/go/client/tcp/tcp_partition_management.go index 0a910240dc..5ed1171642 100644 --- a/foreign/go/client/tcp/tcp_partition_management.go +++ b/foreign/go/client/tcp/tcp_partition_management.go @@ -18,26 +18,23 @@ package tcp import ( - binaryserialization "github.com/apache/iggy/foreign/go/binary_serialization" iggcon "github.com/apache/iggy/foreign/go/contracts" ) func (c *IggyTcpClient) CreatePartitions(streamId iggcon.Identifier, topicId iggcon.Identifier, partitionsCount uint32) error { - message := binaryserialization.CreatePartitions(iggcon.CreatePartitionsRequest{ + _, err := c.do(&iggcon.CreatePartitions{ StreamId: streamId, TopicId: topicId, PartitionsCount: partitionsCount, }) - _, err := c.sendAndFetchResponse(message, iggcon.CreatePartitionsCode) return err } func (c *IggyTcpClient) DeletePartitions(streamId iggcon.Identifier, topicId iggcon.Identifier, partitionsCount uint32) error { - message := binaryserialization.DeletePartitions(iggcon.DeletePartitionsRequest{ + _, err := c.do(&iggcon.DeletePartitions{ StreamId: streamId, TopicId: topicId, PartitionsCount: partitionsCount, }) - _, err := c.sendAndFetchResponse(message, iggcon.DeletePartitionsCode) return err } diff --git a/foreign/go/client/tcp/tcp_session_management.go b/foreign/go/client/tcp/tcp_session_management.go index e446ad40f1..8b32b522ef 100644 --- a/foreign/go/client/tcp/tcp_session_management.go +++ b/foreign/go/client/tcp/tcp_session_management.go @@ -27,11 +27,10 @@ import ( ) func (c *IggyTcpClient) LoginUser(username string, password string) (*iggcon.IdentityInfo, error) { - serializedRequest := binaryserialization.TcpLogInRequest{ + buffer, err := c.do(&iggcon.LoginUser{ Username: username, Password: password, - } - buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.LoginUserCode) + }) if err != nil { return nil, err } @@ -51,10 +50,9 @@ func (c *IggyTcpClient) LoginUser(username string, password string) (*iggcon.Ide } func (c *IggyTcpClient) LoginWithPersonalAccessToken(token string) (*iggcon.IdentityInfo, error) { - message := binaryserialization.SerializeLoginWithPersonalAccessToken(iggcon.LoginWithPersonalAccessTokenRequest{ + buffer, err := c.do(&iggcon.LoginWithPersonalAccessToken{ Token: token, }) - buffer, err := c.sendAndFetchResponse(message, iggcon.LoginWithAccessTokenCode) if err != nil { return nil, err } @@ -74,7 +72,7 @@ func (c *IggyTcpClient) LoginWithPersonalAccessToken(token string) (*iggcon.Iden } func (c *IggyTcpClient) LogoutUser() error { - _, err := c.sendAndFetchResponse([]byte{}, iggcon.LogoutUserCode) + _, err := c.do(&iggcon.LogoutUser{}) return err } diff --git a/foreign/go/client/tcp/tcp_stream_management.go b/foreign/go/client/tcp/tcp_stream_management.go index e8ab8711a2..acc5b62021 100644 --- a/foreign/go/client/tcp/tcp_stream_management.go +++ b/foreign/go/client/tcp/tcp_stream_management.go @@ -24,7 +24,7 @@ import ( ) func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) { - buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetStreamsCode) + buffer, err := c.do(&iggcon.GetStreams{}) if err != nil { return nil, err } @@ -33,8 +33,9 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) { } func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier) (*iggcon.StreamDetails, error) { - message := binaryserialization.SerializeIdentifier(streamId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetStreamCode) + buffer, err := c.do(&iggcon.GetStream{ + StreamId: streamId, + }) if err != nil { return nil, err } @@ -54,8 +55,7 @@ func (c *IggyTcpClient) CreateStream(name string) (*iggcon.StreamDetails, error) if len(name) == 0 || MaxStringLength < len(name) { return nil, ierror.ErrInvalidStreamName } - serializedRequest := binaryserialization.TcpCreateStreamRequest{Name: name} - buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.CreateStreamCode) + buffer, err := c.do(&iggcon.CreateStream{Name: name}) if err != nil { return nil, err } @@ -71,13 +71,11 @@ func (c *IggyTcpClient) UpdateStream(streamId iggcon.Identifier, name string) er if len(name) > MaxStringLength || len(name) == 0 { return ierror.ErrInvalidStreamName } - serializedRequest := binaryserialization.TcpUpdateStreamRequest{StreamId: streamId, Name: name} - _, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.UpdateStreamCode) + _, err := c.do(&iggcon.UpdateStream{StreamId: streamId, Name: name}) return err } func (c *IggyTcpClient) DeleteStream(id iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifier(id) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteStreamCode) + _, err := c.do(&iggcon.DeleteStream{StreamId: id}) return err } diff --git a/foreign/go/client/tcp/tcp_topic_management.go b/foreign/go/client/tcp/tcp_topic_management.go index 3bea2b2a5c..0c69ac8e90 100644 --- a/foreign/go/client/tcp/tcp_topic_management.go +++ b/foreign/go/client/tcp/tcp_topic_management.go @@ -24,8 +24,7 @@ import ( ) func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier) ([]iggcon.Topic, error) { - message := binaryserialization.SerializeIdentifier(streamId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetTopicsCode) + buffer, err := c.do(&iggcon.GetTopics{StreamId: streamId}) if err != nil { return nil, err } @@ -34,8 +33,7 @@ func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier) ([]iggcon.Topic, e } func (c *IggyTcpClient) GetTopic(streamId iggcon.Identifier, topicId iggcon.Identifier) (*iggcon.TopicDetails, error) { - message := binaryserialization.SerializeIdentifiers(streamId, topicId) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetTopicCode) + buffer, err := c.do(&iggcon.GetTopic{StreamId: streamId, TopicId: topicId}) if err != nil { return nil, err } @@ -70,7 +68,7 @@ func (c *IggyTcpClient) CreateTopic( return nil, ierror.ErrInvalidReplicationFactor } - serializedRequest := binaryserialization.TcpCreateTopicRequest{ + buffer, err := c.do(&iggcon.CreateTopic{ StreamId: streamId, Name: name, PartitionsCount: partitionsCount, @@ -78,8 +76,7 @@ func (c *IggyTcpClient) CreateTopic( MessageExpiry: messageExpiry, MaxTopicSize: maxTopicSize, ReplicationFactor: replicationFactor, - } - buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.CreateTopicCode) + }) if err != nil { return nil, err } @@ -102,20 +99,19 @@ func (c *IggyTcpClient) UpdateTopic( if replicationFactor != nil && *replicationFactor == 0 { return ierror.ErrInvalidReplicationFactor } - serializedRequest := binaryserialization.TcpUpdateTopicRequest{ + _, err := c.do(&iggcon.UpdateTopic{ StreamId: streamId, TopicId: topicId, CompressionAlgorithm: compressionAlgorithm, MessageExpiry: messageExpiry, MaxTopicSize: maxTopicSize, ReplicationFactor: replicationFactor, - Name: name} - _, err := c.sendAndFetchResponse(serializedRequest.Serialize(), iggcon.UpdateTopicCode) + Name: name, + }) return err } func (c *IggyTcpClient) DeleteTopic(streamId, topicId iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifiers(streamId, topicId) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteTopicCode) + _, err := c.do(&iggcon.DeleteTopic{StreamId: streamId, TopicId: topicId}) return err } diff --git a/foreign/go/client/tcp/tcp_user_management.go b/foreign/go/client/tcp/tcp_user_management.go index bf53331d9a..c9836ebf1f 100644 --- a/foreign/go/client/tcp/tcp_user_management.go +++ b/foreign/go/client/tcp/tcp_user_management.go @@ -24,8 +24,7 @@ import ( ) func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier) (*iggcon.UserInfoDetails, error) { - message := binaryserialization.SerializeIdentifier(identifier) - buffer, err := c.sendAndFetchResponse(message, iggcon.GetUserCode) + buffer, err := c.do(&iggcon.GetUser{Id: identifier}) if err != nil { return nil, err } @@ -37,7 +36,7 @@ func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier) (*iggcon.UserInfoD } func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo, error) { - buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetUsersCode) + buffer, err := c.do(&iggcon.GetUsers{}) if err != nil { return nil, err } @@ -46,13 +45,12 @@ func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo, error) { } func (c *IggyTcpClient) CreateUser(username string, password string, status iggcon.UserStatus, permissions *iggcon.Permissions) (*iggcon.UserInfoDetails, error) { - message := binaryserialization.SerializeCreateUserRequest(iggcon.CreateUserRequest{ + buffer, err := c.do(&iggcon.CreateUser{ Username: username, Password: password, Status: status, Permissions: permissions, }) - buffer, err := c.sendAndFetchResponse(message, iggcon.CreateUserCode) if err != nil { return nil, err } @@ -64,36 +62,34 @@ func (c *IggyTcpClient) CreateUser(username string, password string, status iggc } func (c *IggyTcpClient) UpdateUser(userID iggcon.Identifier, username *string, status *iggcon.UserStatus) error { - message := binaryserialization.SerializeUpdateUser(iggcon.UpdateUserRequest{ + _, err := c.do(&iggcon.UpdateUser{ UserID: userID, Username: username, Status: status, }) - _, err := c.sendAndFetchResponse(message, iggcon.UpdateUserCode) return err } func (c *IggyTcpClient) DeleteUser(identifier iggcon.Identifier) error { - message := binaryserialization.SerializeIdentifier(identifier) - _, err := c.sendAndFetchResponse(message, iggcon.DeleteUserCode) + _, err := c.do(&iggcon.DeleteUser{ + Id: identifier, + }) return err } func (c *IggyTcpClient) UpdatePermissions(userID iggcon.Identifier, permissions *iggcon.Permissions) error { - message := binaryserialization.SerializeUpdateUserPermissionsRequest(iggcon.UpdatePermissionsRequest{ + _, err := c.do(&iggcon.UpdatePermissions{ UserID: userID, Permissions: permissions, }) - _, err := c.sendAndFetchResponse(message, iggcon.UpdatePermissionsCode) return err } func (c *IggyTcpClient) ChangePassword(userID iggcon.Identifier, currentPassword string, newPassword string) error { - message := binaryserialization.SerializeChangePasswordRequest(iggcon.ChangePasswordRequest{ + _, err := c.do(&iggcon.ChangePassword{ UserID: userID, CurrentPassword: currentPassword, NewPassword: newPassword, }) - _, err := c.sendAndFetchResponse(message, iggcon.ChangePasswordCode) return err } diff --git a/foreign/go/client/tcp/tcp_utilities.go b/foreign/go/client/tcp/tcp_utilities.go index e9d667203e..bd5beaef35 100644 --- a/foreign/go/client/tcp/tcp_utilities.go +++ b/foreign/go/client/tcp/tcp_utilities.go @@ -23,7 +23,7 @@ import ( ) func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) { - buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetStatsCode) + buffer, err := c.do(&iggcon.GetStats{}) if err != nil { return nil, err } @@ -35,6 +35,6 @@ func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) { } func (c *IggyTcpClient) Ping() error { - _, err := c.sendAndFetchResponse([]byte{}, iggcon.PingCode) + _, err := c.do(&iggcon.Ping{}) return err } diff --git a/foreign/go/contracts/access_tokens.go b/foreign/go/contracts/access_tokens.go index 773a5d77a1..edb9b4ef32 100644 --- a/foreign/go/contracts/access_tokens.go +++ b/foreign/go/contracts/access_tokens.go @@ -17,17 +17,55 @@ package iggcon -import "time" +import ( + "encoding/binary" + "time" +) -type CreatePersonalAccessTokenRequest struct { +type CreatePersonalAccessToken struct { Name string `json:"Name"` Expiry uint32 `json:"Expiry"` } -type DeletePersonalAccessTokenRequest struct { +func (c *CreatePersonalAccessToken) Code() CommandCode { + return CreateAccessTokenCode +} + +func (c *CreatePersonalAccessToken) MarshalBinary() ([]byte, error) { + length := 1 + len(c.Name) + 8 + bytes := make([]byte, length) + bytes[0] = byte(len(c.Name)) + copy(bytes[1:], c.Name) + binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], c.Expiry) + return bytes, nil +} + +type DeletePersonalAccessToken struct { Name string `json:"Name"` } +func (d *DeletePersonalAccessToken) Code() CommandCode { + return DeleteAccessTokenCode +} + +func (d *DeletePersonalAccessToken) MarshalBinary() ([]byte, error) { + length := 1 + len(d.Name) + bytes := make([]byte, length) + bytes[0] = byte(len(d.Name)) + copy(bytes[1:], d.Name) + return bytes, nil +} + +type GetPersonalAccessTokens struct{} + +func (g *GetPersonalAccessTokens) Code() CommandCode { + return GetAccessTokensCode +} + +func (g *GetPersonalAccessTokens) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} + type PersonalAccessTokenInfo struct { Name string `json:"Name"` Expiry *time.Time `json:"Expiry"` diff --git a/foreign/go/contracts/change_password.go b/foreign/go/contracts/change_password.go new file mode 100644 index 0000000000..f2c8c1d853 --- /dev/null +++ b/foreign/go/contracts/change_password.go @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type ChangePassword struct { + UserID Identifier `json:"-"` + CurrentPassword string `json:"CurrentPassword"` + NewPassword string `json:"NewPassword"` +} + +func (c *ChangePassword) Code() CommandCode { + return ChangePasswordCode +} + +func (c *ChangePassword) MarshalBinary() ([]byte, error) { + userIdBytes, err := c.UserID.MarshalBinary() + if err != nil { + return nil, err + } + length := len(userIdBytes) + len(c.CurrentPassword) + len(c.NewPassword) + 2 + bytes := make([]byte, length) + position := 0 + + copy(bytes[position:position+len(userIdBytes)], userIdBytes) + position += len(userIdBytes) + + bytes[position] = byte(len(c.CurrentPassword)) + position++ + copy(bytes[position:position+len(c.CurrentPassword)], c.CurrentPassword) + position += len(c.CurrentPassword) + + bytes[position] = byte(len(c.NewPassword)) + position++ + copy(bytes[position:position+len(c.NewPassword)], c.NewPassword) + + return bytes, nil +} diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/command.go index d8b735da06..2c0bbdbfbb 100644 --- a/foreign/go/contracts/command.go +++ b/foreign/go/contracts/command.go @@ -17,7 +17,10 @@ package iggcon -import "encoding" +import ( + "encoding" + "encoding/binary" +) type Command interface { // Code returns the command code associated with this command. @@ -25,3 +28,27 @@ type Command interface { encoding.BinaryMarshaler } + +type GetClients struct{} + +func (c *GetClients) Code() CommandCode { + return GetClientsCode +} + +func (c *GetClients) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} + +type GetClient struct { + ClientID uint32 +} + +func (c *GetClient) Code() CommandCode { + return GetClientCode +} + +func (c *GetClient) MarshalBinary() ([]byte, error) { + bytes := make([]byte, 4) + binary.LittleEndian.PutUint32(bytes, c.ClientID) + return bytes, nil +} diff --git a/foreign/go/contracts/consumer.go b/foreign/go/contracts/consumer.go index 84195ebba4..4d9bf10cc2 100644 --- a/foreign/go/contracts/consumer.go +++ b/foreign/go/contracts/consumer.go @@ -52,3 +52,14 @@ func NewGroupConsumer(id Identifier) Consumer { Id: id, } } + +func (c Consumer) MarshalBinary() ([]byte, error) { + idBytes, err := c.Id.MarshalBinary() + if err != nil { + return nil, err + } + bytes := make([]byte, 0, 1+len(idBytes)) + bytes = append(bytes, uint8(c.Kind)) + bytes = append(bytes, idBytes...) + return bytes, nil +} diff --git a/foreign/go/contracts/consumer_groups.go b/foreign/go/contracts/consumer_groups.go index 5018fd0378..3863a6b8e1 100644 --- a/foreign/go/contracts/consumer_groups.go +++ b/foreign/go/contracts/consumer_groups.go @@ -35,32 +35,105 @@ type ConsumerGroupMember struct { Partitions []uint32 } -type CreateConsumerGroupRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - Name string `json:"name"` +type TopicPath struct { + StreamId Identifier + TopicId Identifier } -type DeleteConsumerGroupRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - ConsumerGroupId Identifier `json:"consumerGroupId"` +type CreateConsumerGroup struct { + TopicPath + Name string } -type JoinConsumerGroupRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - ConsumerGroupId Identifier `json:"consumerGroupId"` +func (c *CreateConsumerGroup) Code() CommandCode { + return CreateGroupCode } -type LeaveConsumerGroupRequest struct { - StreamId Identifier `json:"streamId"` - TopicId Identifier `json:"topicId"` - ConsumerGroupId Identifier `json:"consumerGroupId"` +func (c *CreateConsumerGroup) MarshalBinary() ([]byte, error) { + streamIdBytes, err := c.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := c.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + offset := len(streamIdBytes) + len(topicIdBytes) + bytes := make([]byte, offset+1+len(c.Name)) + copy(bytes[0:len(streamIdBytes)], streamIdBytes) + copy(bytes[len(streamIdBytes):offset], topicIdBytes) + bytes[offset] = byte(len(c.Name)) + copy(bytes[offset+1:], c.Name) + return bytes, nil +} + +type DeleteConsumerGroup struct { + TopicPath + GroupId Identifier +} + +func (d *DeleteConsumerGroup) Code() CommandCode { + return DeleteGroupCode +} + +func (d *DeleteConsumerGroup) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(d.StreamId, d.TopicId, d.GroupId) +} + +type JoinConsumerGroup struct { + TopicPath + GroupId Identifier +} + +func (j *JoinConsumerGroup) Code() CommandCode { + return JoinGroupCode +} + +func (j *JoinConsumerGroup) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(j.StreamId, j.TopicId, j.GroupId) +} + +type LeaveConsumerGroup struct { + TopicPath + GroupId Identifier +} + +func (l *LeaveConsumerGroup) Code() CommandCode { + return LeaveGroupCode +} + +func (l *LeaveConsumerGroup) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(l.StreamId, l.TopicId, l.GroupId) +} + +type GetConsumerGroup struct { + TopicPath + GroupId Identifier +} + +func (g *GetConsumerGroup) Code() CommandCode { + return GetGroupCode +} + +func (g *GetConsumerGroup) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(g.StreamId, g.TopicId, g.GroupId) +} + +type GetConsumerGroups struct { + StreamId Identifier + TopicId Identifier +} + +func (g *GetConsumerGroups) Code() CommandCode { + return GetGroupsCode +} + +func (g *GetConsumerGroups) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(g.StreamId, g.TopicId) } type ConsumerGroupInfo struct { - StreamId uint32 `json:"streamId"` - TopicId uint32 `json:"topicId"` - ConsumerGroupId uint32 `json:"consumerGroupId"` + StreamId uint32 `json:"streamId"` + TopicId uint32 `json:"topicId"` + GroupId uint32 `json:"groupId"` } diff --git a/foreign/go/binary_serialization/create_stream_serializer.go b/foreign/go/contracts/create_stream.go similarity index 79% rename from foreign/go/binary_serialization/create_stream_serializer.go rename to foreign/go/contracts/create_stream.go index be12c4b164..9c4c1a6314 100644 --- a/foreign/go/binary_serialization/create_stream_serializer.go +++ b/foreign/go/contracts/create_stream.go @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon -type TcpCreateStreamRequest struct { +type CreateStream struct { Name string } @@ -26,10 +26,14 @@ const ( payloadOffset = 1 ) -func (request *TcpCreateStreamRequest) Serialize() []byte { +func (request *CreateStream) Code() CommandCode { + return CreateStreamCode +} + +func (request *CreateStream) MarshalBinary() ([]byte, error) { nameLength := len(request.Name) serialized := make([]byte, payloadOffset+nameLength) serialized[nameLengthOffset] = byte(nameLength) - copy(serialized[payloadOffset:], []byte(request.Name)) - return serialized + copy(serialized[payloadOffset:], request.Name) + return serialized, nil } diff --git a/foreign/go/binary_serialization/create_stream_serializer_test.go b/foreign/go/contracts/create_stream_test.go similarity index 89% rename from foreign/go/binary_serialization/create_stream_serializer_test.go rename to foreign/go/contracts/create_stream_test.go index 45a520df1d..19a9c15f94 100644 --- a/foreign/go/binary_serialization/create_stream_serializer_test.go +++ b/foreign/go/contracts/create_stream_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( "reflect" @@ -23,14 +23,16 @@ import ( ) func TestSerialize_TcpCreateStreamRequest(t *testing.T) { - // Create a sample TcpCreateStreamRequest - request := TcpCreateStreamRequest{ + // Create a sample CreateStream + request := CreateStream{ Name: "test_stream", } // Serialize the request - serialized := request.Serialize() - + serialized, err := request.MarshalBinary() + if err != nil { + t.Errorf("Failed to serialize CreateStream: %v", err) + } // Expected serialized bytes expectedNameLength := byte(11) // Length of "test_stream" expectedPayload := []byte("test_stream") diff --git a/foreign/go/binary_serialization/create_topic_serializer.go b/foreign/go/contracts/create_topic.go similarity index 55% rename from foreign/go/binary_serialization/create_topic_serializer.go rename to foreign/go/contracts/create_topic.go index fc5f5f9eca..92a799a86a 100644 --- a/foreign/go/binary_serialization/create_topic_serializer.go +++ b/foreign/go/contracts/create_topic.go @@ -15,30 +15,36 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( "encoding/binary" - iggcon "github.com/apache/iggy/foreign/go/contracts" ) -type TcpCreateTopicRequest struct { - StreamId iggcon.Identifier `json:"streamId"` - PartitionsCount uint32 `json:"partitionsCount"` - CompressionAlgorithm iggcon.CompressionAlgorithm `json:"compressionAlgorithm"` - MessageExpiry iggcon.Duration `json:"messageExpiry"` - MaxTopicSize uint64 `json:"maxTopicSize"` - Name string `json:"name"` - ReplicationFactor *uint8 `json:"replicationFactor"` +type CreateTopic struct { + StreamId Identifier `json:"streamId"` + PartitionsCount uint32 `json:"partitionsCount"` + CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"` + MessageExpiry Duration `json:"messageExpiry"` + MaxTopicSize uint64 `json:"maxTopicSize"` + Name string `json:"name"` + ReplicationFactor *uint8 `json:"replicationFactor"` } -func (request *TcpCreateTopicRequest) Serialize() []byte { - if request.ReplicationFactor == nil { - request.ReplicationFactor = new(uint8) +func (t *CreateTopic) Code() CommandCode { + return CreateTopicCode +} + +func (t *CreateTopic) MarshalBinary() ([]byte, error) { + if t.ReplicationFactor == nil { + t.ReplicationFactor = new(uint8) } - streamIdBytes := SerializeIdentifier(request.StreamId) - nameBytes := []byte(request.Name) + streamIdBytes, err := t.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + nameBytes := []byte(t.Name) totalLength := len(streamIdBytes) + // StreamId 4 + // PartitionsCount @@ -57,23 +63,23 @@ func (request *TcpCreateTopicRequest) Serialize() []byte { position += len(streamIdBytes) // PartitionsCount - binary.LittleEndian.PutUint32(bytes[position:], request.PartitionsCount) + binary.LittleEndian.PutUint32(bytes[position:], t.PartitionsCount) position += 4 // CompressionAlgorithm - bytes[position] = byte(request.CompressionAlgorithm) + bytes[position] = byte(t.CompressionAlgorithm) position++ // MessageExpiry - binary.LittleEndian.PutUint64(bytes[position:], uint64(request.MessageExpiry)) + binary.LittleEndian.PutUint64(bytes[position:], uint64(t.MessageExpiry)) position += 8 // MaxTopicSize - binary.LittleEndian.PutUint64(bytes[position:], request.MaxTopicSize) + binary.LittleEndian.PutUint64(bytes[position:], t.MaxTopicSize) position += 8 // ReplicationFactor - bytes[position] = *request.ReplicationFactor + bytes[position] = *t.ReplicationFactor position++ // Name @@ -81,5 +87,5 @@ func (request *TcpCreateTopicRequest) Serialize() []byte { position++ copy(bytes[position:], nameBytes) - return bytes + return bytes, nil } diff --git a/foreign/go/contracts/create_user.go b/foreign/go/contracts/create_user.go new file mode 100644 index 0000000000..df82987192 --- /dev/null +++ b/foreign/go/contracts/create_user.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +import "encoding/binary" + +type CreateUser struct { + Username string `json:"username"` + Password string `json:"Password"` + Status UserStatus `json:"Status"` + Permissions *Permissions `json:"Permissions,omitempty"` +} + +func (c *CreateUser) Code() CommandCode { + return CreateUserCode +} + +func (c *CreateUser) MarshalBinary() ([]byte, error) { + capacity := 4 + len(c.Username) + len(c.Password) + if c.Permissions != nil { + capacity += 1 + 4 + c.Permissions.Size() + } + + bytes := make([]byte, capacity) + position := 0 + + bytes[position] = byte(len(c.Username)) + position += 1 + copy(bytes[position:position+len(c.Username)], []byte(c.Username)) + position += len(c.Username) + + bytes[position] = byte(len(c.Password)) + position += 1 + copy(bytes[position:position+len(c.Password)], []byte(c.Password)) + position += len(c.Password) + + statusByte := byte(0) + switch c.Status { + case Active: + statusByte = byte(1) + case Inactive: + statusByte = byte(2) + } + bytes[position] = statusByte + position += 1 + + if c.Permissions != nil { + bytes[position] = byte(1) + position += 1 + permissionsBytes, err := c.Permissions.MarshalBinary() + if err != nil { + return nil, err + } + binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes))) + position += 4 + copy(bytes[position:position+len(permissionsBytes)], permissionsBytes) + } else { + bytes[position] = byte(0) + } + + return bytes, nil +} diff --git a/foreign/go/binary_serialization/consumer_serializer.go b/foreign/go/contracts/delete_stream.go similarity index 71% rename from foreign/go/binary_serialization/consumer_serializer.go rename to foreign/go/contracts/delete_stream.go index a359ed51fc..fad72e388e 100644 --- a/foreign/go/binary_serialization/consumer_serializer.go +++ b/foreign/go/contracts/delete_stream.go @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon -import iggcon "github.com/apache/iggy/foreign/go/contracts" +type DeleteStream struct { + StreamId Identifier +} + +func (d *DeleteStream) Code() CommandCode { + return DeleteStreamCode +} -func SerializeConsumer(consumer iggcon.Consumer) []byte { - idBytes := SerializeIdentifier(consumer.Id) - bytes := make([]byte, 0, 1+len(idBytes)) - bytes = append(bytes, uint8(consumer.Kind)) - bytes = append(bytes, idBytes...) - return bytes +func (d *DeleteStream) MarshalBinary() ([]byte, error) { + return d.StreamId.MarshalBinary() } diff --git a/foreign/go/contracts/login.go b/foreign/go/contracts/delete_topic.go similarity index 63% rename from foreign/go/contracts/login.go rename to foreign/go/contracts/delete_topic.go index 31c60b9ed4..d5b062168b 100644 --- a/foreign/go/contracts/login.go +++ b/foreign/go/contracts/delete_topic.go @@ -17,20 +17,15 @@ package iggcon -type LoginUserRequest struct { - Username string `json:"username"` - Password string `json:"password"` - Version string `json:"version,omitempty"` - Context string `json:"context,omitempty"` +type DeleteTopic struct { + StreamId Identifier + TopicId Identifier } -type LoginWithPersonalAccessTokenRequest struct { - Token string `json:"token"` +func (d *DeleteTopic) Code() CommandCode { + return DeleteTopicCode } -type IdentityInfo struct { - // Unique identifier (numeric) of the user. - UserId uint32 `json:"userId"` - // The optional tokens, used only by HTTP transport. - AccessToken *string `json:"accessToken"` +func (d *DeleteTopic) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(d.StreamId, d.TopicId) } diff --git a/foreign/go/contracts/delete_user.go b/foreign/go/contracts/delete_user.go new file mode 100644 index 0000000000..013f8e1ca0 --- /dev/null +++ b/foreign/go/contracts/delete_user.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type DeleteUser struct { + Id Identifier +} + +func (d *DeleteUser) Code() CommandCode { + return DeleteUserCode +} + +func (d *DeleteUser) MarshalBinary() ([]byte, error) { + return d.Id.MarshalBinary() +} diff --git a/foreign/go/contracts/get_stats.go b/foreign/go/contracts/get_stats.go new file mode 100644 index 0000000000..ec6e421060 --- /dev/null +++ b/foreign/go/contracts/get_stats.go @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetStats struct{} + +func (c *GetStats) Code() CommandCode { + return GetStatsCode +} + +func (c *GetStats) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} diff --git a/foreign/go/contracts/get_stream.go b/foreign/go/contracts/get_stream.go new file mode 100644 index 0000000000..9cc1a7d182 --- /dev/null +++ b/foreign/go/contracts/get_stream.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetStream struct { + StreamId Identifier +} + +func (g *GetStream) Code() CommandCode { + return GetStreamCode +} + +func (g *GetStream) MarshalBinary() ([]byte, error) { + return g.StreamId.MarshalBinary() +} diff --git a/foreign/go/contracts/get_streams.go b/foreign/go/contracts/get_streams.go new file mode 100644 index 0000000000..e8b0e6083a --- /dev/null +++ b/foreign/go/contracts/get_streams.go @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetStreams struct{} + +func (g *GetStreams) Code() CommandCode { + return GetStreamsCode +} + +func (g *GetStreams) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} diff --git a/foreign/go/contracts/get_topic.go b/foreign/go/contracts/get_topic.go new file mode 100644 index 0000000000..1026bfd5c9 --- /dev/null +++ b/foreign/go/contracts/get_topic.go @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetTopic struct { + StreamId Identifier + TopicId Identifier +} + +func (g *GetTopic) Code() CommandCode { + return GetTopicCode +} + +func (g *GetTopic) MarshalBinary() ([]byte, error) { + return marshalIdentifiers(g.StreamId, g.TopicId) +} diff --git a/foreign/go/contracts/get_topics.go b/foreign/go/contracts/get_topics.go new file mode 100644 index 0000000000..23dbbd53d7 --- /dev/null +++ b/foreign/go/contracts/get_topics.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetTopics struct { + StreamId Identifier +} + +func (g *GetTopics) Code() CommandCode { + return GetTopicsCode +} + +func (g *GetTopics) MarshalBinary() ([]byte, error) { + return g.StreamId.MarshalBinary() +} diff --git a/foreign/go/contracts/get_user.go b/foreign/go/contracts/get_user.go new file mode 100644 index 0000000000..0ef25435ca --- /dev/null +++ b/foreign/go/contracts/get_user.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetUser struct { + Id Identifier +} + +func (c *GetUser) Code() CommandCode { + return GetUserCode +} + +func (c *GetUser) MarshalBinary() ([]byte, error) { + return c.Id.MarshalBinary() +} diff --git a/foreign/go/contracts/get_users.go b/foreign/go/contracts/get_users.go new file mode 100644 index 0000000000..21c5b3de52 --- /dev/null +++ b/foreign/go/contracts/get_users.go @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type GetUsers struct{} + +func (g *GetUsers) Code() CommandCode { + return GetUsersCode +} + +func (g *GetUsers) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} diff --git a/foreign/go/contracts/identifier.go b/foreign/go/contracts/identifier.go index 0e3401acbe..3938d490ae 100644 --- a/foreign/go/contracts/identifier.go +++ b/foreign/go/contracts/identifier.go @@ -88,3 +88,35 @@ func (id Identifier) String() (string, error) { return string(id.Value), nil } + +func (id Identifier) MarshalBinary() ([]byte, error) { + bytes := make([]byte, id.Length+2) + bytes[0] = byte(id.Kind) + bytes[1] = byte(id.Length) + copy(bytes[2:], id.Value) + return bytes, nil +} + +func (id Identifier) AppendBinary(b []byte) ([]byte, error) { + b = append(b, byte(id.Kind), byte(id.Length)) + b = append(b, id.Value...) + return b, nil +} + +func marshalIdentifiers(identifiers ...Identifier) ([]byte, error) { + size := 0 + for i := 0; i < len(identifiers); i++ { + size += 2 + identifiers[i].Length + } + bytes := make([]byte, 0, size) + + for i := 0; i < len(identifiers); i++ { + var err error + bytes, err = identifiers[i].AppendBinary(bytes) + if err != nil { + return nil, err + } + } + + return bytes, nil +} diff --git a/foreign/go/binary_serialization/identifier_serializer_test.go b/foreign/go/contracts/identifier_test.go similarity index 82% rename from foreign/go/binary_serialization/identifier_serializer_test.go rename to foreign/go/contracts/identifier_test.go index 06fde46e1a..a83a893675 100644 --- a/foreign/go/binary_serialization/identifier_serializer_test.go +++ b/foreign/go/contracts/identifier_test.go @@ -15,23 +15,25 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( + "bytes" "errors" "testing" ierror "github.com/apache/iggy/foreign/go/errors" - - iggcon "github.com/apache/iggy/foreign/go/contracts" ) func TestSerializeIdentifier_StringId(t *testing.T) { // Test case for StringId - identifier, _ := iggcon.NewIdentifier("Hello") + identifier, _ := NewIdentifier("Hello") // Serialize the identifier - serialized := SerializeIdentifier(identifier) + serialized, err := identifier.MarshalBinary() + if err != nil { + t.Errorf("Error serializing identifier: %v", err) + } // Expected serialized bytes for StringId expected := []byte{ @@ -41,17 +43,20 @@ func TestSerializeIdentifier_StringId(t *testing.T) { } // Check if the serialized bytes match the expected bytes - if !areBytesEqual(serialized, expected) { + if !bytes.Equal(serialized, expected) { t.Errorf("Serialized bytes are incorrect for StringId. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) } } func TestSerializeIdentifier_NumericId(t *testing.T) { // Test case for NumericId - identifier, _ := iggcon.NewIdentifier(uint32(123)) + identifier, _ := NewIdentifier(uint32(123)) // Serialize the identifier - serialized := SerializeIdentifier(identifier) + serialized, err := identifier.MarshalBinary() + if err != nil { + t.Errorf("Error serializing identifier: %v", err) + } // Expected serialized bytes for NumericId expected := []byte{ @@ -61,14 +66,14 @@ func TestSerializeIdentifier_NumericId(t *testing.T) { } // Check if the serialized bytes match the expected bytes - if !areBytesEqual(serialized, expected) { + if !bytes.Equal(serialized, expected) { t.Errorf("Serialized bytes are incorrect for NumericId. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) } } func TestSerializeIdentifier_EmptyStringId(t *testing.T) { // Test case for an empty StringId - _, err := iggcon.NewIdentifier("") + _, err := NewIdentifier("") // Check if the serialized bytes match the expected bytes if !errors.Is(err, ierror.ErrInvalidIdentifier) { diff --git a/foreign/go/contracts/offsets.go b/foreign/go/contracts/offsets.go index d85c25f4f1..8726a5957f 100644 --- a/foreign/go/contracts/offsets.go +++ b/foreign/go/contracts/offsets.go @@ -17,6 +17,8 @@ package iggcon +import "encoding/binary" + type StoreConsumerOffsetRequest struct { StreamId Identifier `json:"streamId"` TopicId Identifier `json:"topicId"` @@ -25,22 +27,134 @@ type StoreConsumerOffsetRequest struct { Offset uint64 `json:"offset"` } -type GetConsumerOffsetRequest struct { +func (s *StoreConsumerOffsetRequest) Code() CommandCode { + return StoreOffsetCode +} + +func (s *StoreConsumerOffsetRequest) MarshalBinary() ([]byte, error) { + hasPartition := byte(0) + var partition uint32 = 0 + if s.PartitionId != nil { + hasPartition = 1 + partition = *s.PartitionId + } + consumerBytes, err := s.Consumer.MarshalBinary() + if err != nil { + return nil, err + } + streamIdBytes, err := s.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := s.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + // consumer + stream_id + topic_id + hasPartition(1) + partition(4) + offset(8) + bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13) + position := 0 + copy(bytes[position:], consumerBytes) + position += len(consumerBytes) + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + bytes[position] = hasPartition + binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) + binary.LittleEndian.PutUint64(bytes[position+5:position+13], s.Offset) + return bytes, nil +} + +type GetConsumerOffset struct { StreamId Identifier `json:"streamId"` TopicId Identifier `json:"topicId"` Consumer Consumer `json:"consumer"` PartitionId *uint32 `json:"partitionId"` } +func (g *GetConsumerOffset) Code() CommandCode { + return GetOffsetCode +} + +func (g *GetConsumerOffset) MarshalBinary() ([]byte, error) { + hasPartition := byte(0) + var partition uint32 = 0 + if g.PartitionId != nil { + hasPartition = 1 + partition = *g.PartitionId + } + consumerBytes, err := g.Consumer.MarshalBinary() + if err != nil { + return nil, err + } + streamIdBytes, err := g.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := g.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + // consumer + stream_id + topic_id + hasPartition(1) + partition(4) + bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5) + position := 0 + copy(bytes[position:], consumerBytes) + position += len(consumerBytes) + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + bytes[position] = hasPartition + binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) + return bytes, nil +} + type ConsumerOffsetInfo struct { PartitionId uint32 `json:"partitionId"` CurrentOffset uint64 `json:"currentOffset"` StoredOffset uint64 `json:"storedOffset"` } -type DeleteConsumerOffsetRequest struct { +type DeleteConsumerOffset struct { Consumer Consumer StreamId Identifier TopicId Identifier PartitionId *uint32 } + +func (d *DeleteConsumerOffset) Code() CommandCode { + return DeleteConsumerOffsetCode +} + +func (d *DeleteConsumerOffset) MarshalBinary() ([]byte, error) { + hasPartition := byte(0) + var partition uint32 = 0 + if d.PartitionId != nil { + hasPartition = 1 + partition = *d.PartitionId + } + consumerBytes, err := d.Consumer.MarshalBinary() + if err != nil { + return nil, err + } + streamIdBytes, err := d.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := d.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + // consumer + stream_id + topic_id + hasPartition(1) + partition(4) + bytes := make([]byte, len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5) + position := 0 + copy(bytes[position:], consumerBytes) + position += len(consumerBytes) + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + bytes[position] = hasPartition + binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition) + return bytes, nil +} diff --git a/foreign/go/contracts/partitions.go b/foreign/go/contracts/partitions.go index ff32a17088..00ab0244cf 100644 --- a/foreign/go/contracts/partitions.go +++ b/foreign/go/contracts/partitions.go @@ -33,18 +33,66 @@ type PartitionContract struct { SizeBytes uint64 `json:"sizeBytes"` } -type CreatePartitionsRequest struct { +type CreatePartitions struct { StreamId Identifier `json:"streamId"` TopicId Identifier `json:"topicId"` PartitionsCount uint32 `json:"partitionsCount"` } -type DeletePartitionsRequest struct { +func (c *CreatePartitions) Code() CommandCode { + return CreatePartitionsCode +} + +func (c *CreatePartitions) MarshalBinary() ([]byte, error) { + streamIdBytes, err := c.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := c.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4) + position := 0 + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + binary.LittleEndian.PutUint32(bytes[position:position+4], c.PartitionsCount) + + return bytes, nil +} + +type DeletePartitions struct { StreamId Identifier `json:"streamId"` TopicId Identifier `json:"topicId"` PartitionsCount uint32 `json:"partitionsCount"` } +func (d *DeletePartitions) Code() CommandCode { + return DeletePartitionsCode +} + +func (d *DeletePartitions) MarshalBinary() ([]byte, error) { + streamIdBytes, err := d.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := d.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4) + position := 0 + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + binary.LittleEndian.PutUint32(bytes[position:position+4], d.PartitionsCount) + + return bytes, nil +} + type PartitioningKind int const ( @@ -130,3 +178,11 @@ func EntityIdGuid(value uuid.UUID) Partitioning { Value: bytes, } } + +func (p Partitioning) MarshalBinary() ([]byte, error) { + bytes := make([]byte, 2+p.Length) + bytes[0] = byte(p.Kind) + bytes[1] = byte(p.Length) + copy(bytes[2:], p.Value) + return bytes, nil +} diff --git a/foreign/go/contracts/ping.go b/foreign/go/contracts/ping.go new file mode 100644 index 0000000000..dd43a6aad2 --- /dev/null +++ b/foreign/go/contracts/ping.go @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type Ping struct{} + +func (p *Ping) Code() CommandCode { + return PingCode +} + +func (p *Ping) MarshalBinary() ([]byte, error) { + return []byte{}, nil +} diff --git a/foreign/go/binary_serialization/fetch_messages_request_serializer.go b/foreign/go/contracts/poll_messages.go similarity index 62% rename from foreign/go/binary_serialization/fetch_messages_request_serializer.go rename to foreign/go/contracts/poll_messages.go index 798918e48b..efeaf8171f 100644 --- a/foreign/go/binary_serialization/fetch_messages_request_serializer.go +++ b/foreign/go/contracts/poll_messages.go @@ -15,12 +15,10 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( "encoding/binary" - - iggcon "github.com/apache/iggy/foreign/go/contracts" ) const ( @@ -31,24 +29,37 @@ const ( commitFlagSize = 1 ) -type TcpFetchMessagesRequest struct { - StreamId iggcon.Identifier `json:"streamId"` - TopicId iggcon.Identifier `json:"topicId"` - Consumer iggcon.Consumer `json:"consumer"` - PartitionId *uint32 `json:"partitionId"` - Strategy iggcon.PollingStrategy `json:"pollingStrategy"` - Count uint32 `json:"count"` - AutoCommit bool `json:"autoCommit"` +type PollMessages struct { + StreamId Identifier `json:"streamId"` + TopicId Identifier `json:"topicId"` + Consumer Consumer `json:"consumer"` + PartitionId *uint32 `json:"partitionId"` + Strategy PollingStrategy `json:"pollingStrategy"` + Count uint32 `json:"count"` + AutoCommit bool `json:"autoCommit"` +} + +func (m *PollMessages) Code() CommandCode { + return PollMessagesCode } -func (request *TcpFetchMessagesRequest) Serialize() []byte { - consumerIdBytes := SerializeIdentifier(request.Consumer.Id) - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) +func (m *PollMessages) MarshalBinary() ([]byte, error) { + consumerIdBytes, err := m.Consumer.Id.MarshalBinary() + if err != nil { + return nil, err + } + streamIdBytes, err := m.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := m.TopicId.MarshalBinary() + if err != nil { + return nil, err + } messageSize := 1 + len(consumerIdBytes) + len(streamIdBytes) + len(topicIdBytes) + partitionStrategySize + offsetSize + commitFlagSize bytes := make([]byte, messageSize) - bytes[0] = byte(request.Consumer.Kind) + bytes[0] = byte(m.Consumer.Kind) position := 1 copy(bytes[position:position+len(consumerIdBytes)], consumerIdBytes) position += len(consumerIdBytes) @@ -57,26 +68,26 @@ func (request *TcpFetchMessagesRequest) Serialize() []byte { position += len(streamIdBytes) copy(bytes[position:position+len(topicIdBytes)], topicIdBytes) position += len(topicIdBytes) - if request.PartitionId != nil { + if m.PartitionId != nil { bytes[position] = 1 - binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], *request.PartitionId) + binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], *m.PartitionId) } else { bytes[position] = 0 binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0) } - bytes[position+1+4] = byte(request.Strategy.Kind) + bytes[position+1+4] = byte(m.Strategy.Kind) position += partitionStrategySize - binary.LittleEndian.PutUint64(bytes[position:position+8], request.Strategy.Value) - binary.LittleEndian.PutUint32(bytes[position+8:position+12], request.Count) + binary.LittleEndian.PutUint64(bytes[position:position+8], m.Strategy.Value) + binary.LittleEndian.PutUint32(bytes[position+8:position+12], m.Count) position += offsetSize - if request.AutoCommit { + if m.AutoCommit { bytes[position] = 1 } else { bytes[position] = 0 } - return bytes + return bytes, nil } diff --git a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go b/foreign/go/contracts/poll_messages_test.go similarity index 85% rename from foreign/go/binary_serialization/fetch_messages_request_serializer_test.go rename to foreign/go/contracts/poll_messages_test.go index 0c46d0ca24..a1455432c5 100644 --- a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go +++ b/foreign/go/contracts/poll_messages_test.go @@ -15,32 +15,33 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( "testing" - - iggcon "github.com/apache/iggy/foreign/go/contracts" ) func TestSerialize_TcpFetchMessagesRequest(t *testing.T) { partitionId := uint32(123) - consumerId, _ := iggcon.NewIdentifier(uint32(42)) - streamId, _ := iggcon.NewIdentifier("test_stream_id") - topicId, _ := iggcon.NewIdentifier("test_topic_id") - // Create a sample TcpFetchMessagesRequest - request := TcpFetchMessagesRequest{ - Consumer: iggcon.NewSingleConsumer(consumerId), + consumerId, _ := NewIdentifier(uint32(42)) + streamId, _ := NewIdentifier("test_stream_id") + topicId, _ := NewIdentifier("test_topic_id") + // Create a sample PollMessages + request := PollMessages{ + Consumer: NewSingleConsumer(consumerId), StreamId: streamId, TopicId: topicId, PartitionId: &partitionId, - Strategy: iggcon.FirstPollingStrategy(), + Strategy: FirstPollingStrategy(), Count: 100, AutoCommit: true, } // Serialize the request - serialized := request.Serialize() + serialized, err := request.MarshalBinary() + if err != nil { + t.Error(err) + } // Expected serialized bytes based on the provided sample request expected := []byte{ diff --git a/foreign/go/binary_serialization/send_messages_request_serializer.go b/foreign/go/contracts/send_messages.go similarity index 62% rename from foreign/go/binary_serialization/send_messages_request_serializer.go rename to foreign/go/contracts/send_messages.go index 2ee145d415..3effccfc32 100644 --- a/foreign/go/binary_serialization/send_messages_request_serializer.go +++ b/foreign/go/contracts/send_messages.go @@ -15,60 +15,74 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( "encoding/binary" - iggcon "github.com/apache/iggy/foreign/go/contracts" "github.com/klauspost/compress/s2" ) -type TcpSendMessagesRequest struct { - StreamId iggcon.Identifier `json:"streamId"` - TopicId iggcon.Identifier `json:"topicId"` - Partitioning iggcon.Partitioning `json:"partitioning"` - Messages []iggcon.IggyMessage `json:"messages"` +type SendMessages struct { + Compression IggyMessageCompression + + StreamId Identifier `json:"streamId"` + TopicId Identifier `json:"topicId"` + Partitioning Partitioning `json:"partitioning"` + Messages []IggyMessage `json:"messages"` } const indexSize = 16 -func (request *TcpSendMessagesRequest) Serialize(compression iggcon.IggyMessageCompression) []byte { - for i, message := range request.Messages { - switch compression { - case iggcon.MESSAGE_COMPRESSION_S2: +func (s *SendMessages) Code() CommandCode { + return SendMessagesCode +} + +func (s *SendMessages) MarshalBinary() ([]byte, error) { + for i, message := range s.Messages { + switch s.Compression { + case MESSAGE_COMPRESSION_S2: if len(message.Payload) < 32 { break } - request.Messages[i].Payload = s2.Encode(nil, message.Payload) + s.Messages[i].Payload = s2.Encode(nil, message.Payload) message.Header.PayloadLength = uint32(len(message.Payload)) - case iggcon.MESSAGE_COMPRESSION_S2_BETTER: + case MESSAGE_COMPRESSION_S2_BETTER: if len(message.Payload) < 32 { break } - request.Messages[i].Payload = s2.EncodeBetter(nil, message.Payload) + s.Messages[i].Payload = s2.EncodeBetter(nil, message.Payload) message.Header.PayloadLength = uint32(len(message.Payload)) - case iggcon.MESSAGE_COMPRESSION_S2_BEST: + case MESSAGE_COMPRESSION_S2_BEST: if len(message.Payload) < 32 { break } - request.Messages[i].Payload = s2.EncodeBest(nil, message.Payload) + s.Messages[i].Payload = s2.EncodeBest(nil, message.Payload) message.Header.PayloadLength = uint32(len(message.Payload)) } } - streamIdBytes := SerializeIdentifier(request.StreamId) - topicIdBytes := SerializeIdentifier(request.TopicId) - partitioningBytes := SerializePartitioning(request.Partitioning) + streamIdBytes, err := s.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := s.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + partitioningBytes, err := s.Partitioning.MarshalBinary() + if err != nil { + return nil, err + } metadataLenFieldSize := 4 // uint32 - messageCount := len(request.Messages) + messageCount := len(s.Messages) messagesCountFieldSize := 4 // uint32 metadataLen := len(streamIdBytes) + len(topicIdBytes) + len(partitioningBytes) + messagesCountFieldSize indexesSize := messageCount * indexSize - messageBytesCount := calculateMessageBytesCount(request.Messages) + messageBytesCount := calculateMessageBytesCount(s.Messages) totalSize := metadataLenFieldSize + len(streamIdBytes) + len(topicIdBytes) + @@ -103,14 +117,14 @@ func (request *TcpSendMessagesRequest) Serialize(compression iggcon.IggyMessageC position += indexesSize msgSize := uint32(0) - for _, message := range request.Messages { - copy(bytes[position:position+iggcon.MessageHeaderSize], message.Header.ToBytes()) - copy(bytes[position+iggcon.MessageHeaderSize:position+iggcon.MessageHeaderSize+int(message.Header.PayloadLength)], message.Payload) - position += iggcon.MessageHeaderSize + int(message.Header.PayloadLength) + for _, message := range s.Messages { + copy(bytes[position:position+MessageHeaderSize], message.Header.ToBytes()) + copy(bytes[position+MessageHeaderSize:position+MessageHeaderSize+int(message.Header.PayloadLength)], message.Payload) + position += MessageHeaderSize + int(message.Header.PayloadLength) copy(bytes[position:position+int(message.Header.UserHeaderLength)], message.UserHeaders) position += int(message.Header.UserHeaderLength) - msgSize += iggcon.MessageHeaderSize + message.Header.PayloadLength + message.Header.UserHeaderLength + msgSize += MessageHeaderSize + message.Header.PayloadLength + message.Header.UserHeaderLength binary.LittleEndian.PutUint32(bytes[currentIndexPosition:currentIndexPosition+4], 0) binary.LittleEndian.PutUint32(bytes[currentIndexPosition+4:currentIndexPosition+8], uint32(msgSize)) @@ -118,13 +132,13 @@ func (request *TcpSendMessagesRequest) Serialize(compression iggcon.IggyMessageC currentIndexPosition += indexSize } - return bytes + return bytes, nil } -func calculateMessageBytesCount(messages []iggcon.IggyMessage) int { +func calculateMessageBytesCount(messages []IggyMessage) int { count := 0 for _, msg := range messages { - count += iggcon.MessageHeaderSize + len(msg.Payload) + len(msg.UserHeaders) + count += MessageHeaderSize + len(msg.Payload) + len(msg.UserHeaders) } return count } diff --git a/foreign/go/binary_serialization/send_messages_request_serializer_test.go b/foreign/go/contracts/send_messages_test.go similarity index 70% rename from foreign/go/binary_serialization/send_messages_request_serializer_test.go rename to foreign/go/contracts/send_messages_test.go index c319e11ae3..1ddf4f831f 100644 --- a/foreign/go/binary_serialization/send_messages_request_serializer_test.go +++ b/foreign/go/contracts/send_messages_test.go @@ -15,30 +15,34 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( + "bytes" "testing" - iggcon "github.com/apache/iggy/foreign/go/contracts" "github.com/google/uuid" ) func TestSerialize_SendMessagesRequest(t *testing.T) { message1 := generateTestMessage("data1") - streamId, _ := iggcon.NewIdentifier("test_stream_id") - topicId, _ := iggcon.NewIdentifier("test_topic_id") - request := TcpSendMessagesRequest{ + streamId, _ := NewIdentifier("test_stream_id") + topicId, _ := NewIdentifier("test_topic_id") + request := SendMessages{ StreamId: streamId, TopicId: topicId, - Partitioning: iggcon.PartitionId(1), - Messages: []iggcon.IggyMessage{ + Partitioning: PartitionId(1), + Messages: []IggyMessage{ message1, }, + Compression: MESSAGE_COMPRESSION_NONE, } // Serialize the request - serialized := request.Serialize(iggcon.MESSAGE_COMPRESSION_NONE) + serialized, err := request.MarshalBinary() + if err != nil { + t.Error(err) + } // Expected serialized bytes based on the provided sample request expected := []byte{ @@ -61,22 +65,22 @@ func TestSerialize_SendMessagesRequest(t *testing.T) { expected = append(expected, message1.UserHeaders...) // Check if the serialized bytes match the expected bytes - if !areBytesEqual(serialized, expected) { + if !bytes.Equal(serialized, expected) { t.Errorf("Serialized bytes are incorrect. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) } } -func createDefaultMessageHeaders() []iggcon.HeaderEntry { - return []iggcon.HeaderEntry{ - {Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte("HeaderKey1")}, Value: iggcon.HeaderValue{Kind: iggcon.String, Value: []byte("Value 1")}}, - {Key: iggcon.HeaderKey{Kind: iggcon.String, Value: []byte("HeaderKey2")}, Value: iggcon.HeaderValue{Kind: iggcon.Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}}, +func createDefaultMessageHeaders() []HeaderEntry { + return []HeaderEntry{ + {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey1")}, Value: HeaderValue{Kind: String, Value: []byte("Value 1")}}, + {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey2")}, Value: HeaderValue{Kind: Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}}, } } -func generateTestMessage(payload string) iggcon.IggyMessage { - msg, _ := iggcon.NewIggyMessage( +func generateTestMessage(payload string) IggyMessage { + msg, _ := NewIggyMessage( []byte(payload), - iggcon.WithID(uuid.New()), - iggcon.WithUserHeaders(createDefaultMessageHeaders())) + WithID(uuid.New()), + WithUserHeaders(createDefaultMessageHeaders())) return msg } diff --git a/foreign/go/binary_serialization/log_in_request_serializer.go b/foreign/go/contracts/session.go similarity index 62% rename from foreign/go/binary_serialization/log_in_request_serializer.go rename to foreign/go/contracts/session.go index bd714be6bb..4e62ab88c2 100644 --- a/foreign/go/binary_serialization/log_in_request_serializer.go +++ b/foreign/go/contracts/session.go @@ -15,20 +15,22 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon -import ( - "encoding/binary" -) +import "encoding/binary" -type TcpLogInRequest struct { +type LoginUser struct { Username string `json:"username"` Password string `json:"password"` } -func (request *TcpLogInRequest) Serialize() []byte { - usernameBytes := []byte(request.Username) - passwordBytes := []byte(request.Password) +func (lu *LoginUser) Code() CommandCode { + return LoginUserCode +} + +func (lu *LoginUser) MarshalBinary() ([]byte, error) { + usernameBytes := []byte(lu.Username) + passwordBytes := []byte(lu.Password) versionBytes := []byte("") contextBytes := []byte("") @@ -62,5 +64,38 @@ func (request *TcpLogInRequest) Serialize() []byte { position += 4 copy(result[position:], contextBytes) - return result + return result, nil +} + +type LoginWithPersonalAccessToken struct { + Token string `json:"token"` +} + +func (lw *LoginWithPersonalAccessToken) Code() CommandCode { + return LoginWithAccessTokenCode +} + +func (lw *LoginWithPersonalAccessToken) MarshalBinary() ([]byte, error) { + length := 1 + len(lw.Token) + bytes := make([]byte, length) + bytes[0] = byte(len(lw.Token)) + copy(bytes[1:], lw.Token) + return bytes, nil +} + +type IdentityInfo struct { + // Unique identifier (numeric) of the user. + UserId uint32 `json:"userId"` + // The optional tokens, used only by HTTP transport. + AccessToken *string `json:"accessToken"` +} + +type LogoutUser struct{} + +func (lu *LogoutUser) Code() CommandCode { + return LogoutUserCode +} + +func (lu *LogoutUser) MarshalBinary() ([]byte, error) { + return []byte{}, nil } diff --git a/foreign/go/binary_serialization/update_stream_serializer.go b/foreign/go/contracts/update_stream.go similarity index 69% rename from foreign/go/binary_serialization/update_stream_serializer.go rename to foreign/go/contracts/update_stream.go index c6a95bc940..2da4869ad8 100644 --- a/foreign/go/binary_serialization/update_stream_serializer.go +++ b/foreign/go/contracts/update_stream.go @@ -15,24 +15,26 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon -import ( - iggcon "github.com/apache/iggy/foreign/go/contracts" -) - -type TcpUpdateStreamRequest struct { - StreamId iggcon.Identifier `json:"streamId"` - Name string `json:"name"` +type UpdateStream struct { + StreamId Identifier `json:"streamId"` + Name string `json:"name"` } -func (request *TcpUpdateStreamRequest) Serialize() []byte { - streamIdBytes := SerializeIdentifier(request.StreamId) - nameLength := len(request.Name) +func (u *UpdateStream) Code() CommandCode { + return UpdateStreamCode +} +func (u *UpdateStream) MarshalBinary() ([]byte, error) { + streamIdBytes, err := u.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + nameLength := len(u.Name) bytes := make([]byte, len(streamIdBytes)+1+nameLength) copy(bytes[0:len(streamIdBytes)], streamIdBytes) position := len(streamIdBytes) bytes[position] = byte(nameLength) - copy(bytes[position+1:], []byte(request.Name)) - return bytes + copy(bytes[position+1:], u.Name) + return bytes, nil } diff --git a/foreign/go/binary_serialization/update_stream_serializer_test.go b/foreign/go/contracts/update_stream_test.go similarity index 85% rename from foreign/go/binary_serialization/update_stream_serializer_test.go rename to foreign/go/contracts/update_stream_test.go index ab0f223001..abb480bb00 100644 --- a/foreign/go/binary_serialization/update_stream_serializer_test.go +++ b/foreign/go/contracts/update_stream_test.go @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( + "bytes" "testing" - - iggcon "github.com/apache/iggy/foreign/go/contracts" ) func TestSerialize_UpdateStream(t *testing.T) { - streamId, _ := iggcon.NewIdentifier("stream") - request := TcpUpdateStreamRequest{ + streamId, _ := NewIdentifier("stream") + request := UpdateStream{ StreamId: streamId, Name: "update_stream", } - serialized1 := request.Serialize() + serialized1, err := request.MarshalBinary() + if err != nil { + t.Errorf("Failed to serialize UpdateStream: %v", err) + } expected := []byte{ 0x02, // StreamId Kind (StringId) @@ -40,7 +42,7 @@ func TestSerialize_UpdateStream(t *testing.T) { 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // Name ("update_stream") } - if !areBytesEqual(serialized1, expected) { + if !bytes.Equal(serialized1, expected) { t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized1) } } diff --git a/foreign/go/contracts/update_topic.go b/foreign/go/contracts/update_topic.go new file mode 100644 index 0000000000..ef5de48632 --- /dev/null +++ b/foreign/go/contracts/update_topic.go @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +import ( + "encoding/binary" +) + +type UpdateTopic struct { + StreamId Identifier `json:"streamId"` + TopicId Identifier `json:"topicId"` + CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"` + MessageExpiry Duration `json:"messageExpiry"` + MaxTopicSize uint64 `json:"maxTopicSize"` + ReplicationFactor *uint8 `json:"replicationFactor"` + Name string `json:"name"` +} + +func (u *UpdateTopic) Code() CommandCode { + return UpdateTopicCode +} + +func (u *UpdateTopic) MarshalBinary() ([]byte, error) { + if u.ReplicationFactor == nil { + u.ReplicationFactor = new(uint8) + } + streamIdBytes, err := u.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := u.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + + buffer := make([]byte, 19+len(streamIdBytes)+len(topicIdBytes)+len(u.Name)) + + offset := 0 + + offset += copy(buffer[offset:], streamIdBytes) + offset += copy(buffer[offset:], topicIdBytes) + + buffer[offset] = byte(u.CompressionAlgorithm) + offset++ + + binary.LittleEndian.PutUint64(buffer[offset:], uint64(u.MessageExpiry)) + offset += 8 + + binary.LittleEndian.PutUint64(buffer[offset:], u.MaxTopicSize) + offset += 8 + + buffer[offset] = *u.ReplicationFactor + offset++ + + buffer[offset] = uint8(len(u.Name)) + offset++ + + copy(buffer[offset:], u.Name) + + return buffer, nil +} diff --git a/foreign/go/binary_serialization/update_topic_serializer_test.go b/foreign/go/contracts/update_topic_test.go similarity index 85% rename from foreign/go/binary_serialization/update_topic_serializer_test.go rename to foreign/go/contracts/update_topic_test.go index f9df22222a..319b7b43c5 100644 --- a/foreign/go/binary_serialization/update_topic_serializer_test.go +++ b/foreign/go/contracts/update_topic_test.go @@ -15,26 +15,28 @@ // specific language governing permissions and limitations // under the License. -package binaryserialization +package iggcon import ( + "bytes" "testing" - - iggcon "github.com/apache/iggy/foreign/go/contracts" ) func TestSerialize_UpdateTopic(t *testing.T) { - streamId, _ := iggcon.NewIdentifier("stream") - topicId, _ := iggcon.NewIdentifier(uint32(1)) - request := TcpUpdateTopicRequest{ + streamId, _ := NewIdentifier("stream") + topicId, _ := NewIdentifier(uint32(1)) + request := UpdateTopic{ StreamId: streamId, TopicId: topicId, Name: "update_topic", - MessageExpiry: 100 * iggcon.Microsecond, + MessageExpiry: 100 * Microsecond, MaxTopicSize: 100, } - serialized1 := request.Serialize() + serialized1, err := request.MarshalBinary() + if err != nil { + t.Errorf("Failed to serialize UpdateTopic: %v", err) + } expected := []byte{ 0x02, // StreamId Kind (StringId) @@ -51,7 +53,7 @@ func TestSerialize_UpdateTopic(t *testing.T) { 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63, // Name ("update_topic") } - if !areBytesEqual(serialized1, expected) { + if !bytes.Equal(serialized1, expected) { t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized1) } } diff --git a/foreign/go/contracts/update_user.go b/foreign/go/contracts/update_user.go new file mode 100644 index 0000000000..1dad2db14e --- /dev/null +++ b/foreign/go/contracts/update_user.go @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +type UpdateUser struct { + UserID Identifier `json:"-"` + Username *string `json:"username"` + Status *UserStatus `json:"userStatus"` +} + +func (u *UpdateUser) Code() CommandCode { + return UpdateUserCode +} + +func (u *UpdateUser) MarshalBinary() ([]byte, error) { + userIdBytes, err := u.UserID.MarshalBinary() + if err != nil { + return nil, err + } + length := len(userIdBytes) + + if u.Username == nil { + u.Username = new(string) + } + + username := *u.Username + + if len(username) != 0 { + length += 2 + len(username) + } + + if u.Status != nil { + length += 2 + } + + bytes := make([]byte, length+1) + position := 0 + + copy(bytes[position:position+len(userIdBytes)], userIdBytes) + position += len(userIdBytes) + + if len(username) != 0 { + bytes[position] = 1 + position++ + bytes[position] = byte(len(username)) + position++ + copy(bytes[position:position+len(username)], username) + position += len(username) + } else { + bytes[position] = 0 + position++ + } + + if u.Status != nil { + bytes[position] = 1 + position++ + statusByte := byte(0) + switch *u.Status { + case Active: + statusByte = 1 + case Inactive: + statusByte = 2 + } + bytes[position] = statusByte + } else { + bytes[position] = 0 + } + + return bytes, nil +} diff --git a/foreign/go/contracts/update_user_permissions.go b/foreign/go/contracts/update_user_permissions.go new file mode 100644 index 0000000000..952bd14d00 --- /dev/null +++ b/foreign/go/contracts/update_user_permissions.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +import "encoding/binary" + +type UpdatePermissions struct { + UserID Identifier `json:"-"` + Permissions *Permissions `json:"Permissions,omitempty"` +} + +func (u *UpdatePermissions) Code() CommandCode { + return UpdatePermissionsCode +} + +func (u *UpdatePermissions) MarshalBinary() ([]byte, error) { + userIdBytes, err := u.UserID.MarshalBinary() + if err != nil { + return nil, err + } + length := len(userIdBytes) + + if u.Permissions != nil { + length += 1 + 4 + u.Permissions.Size() + } + + bytes := make([]byte, length) + position := 0 + + copy(bytes[position:position+len(userIdBytes)], userIdBytes) + position += len(userIdBytes) + + if u.Permissions != nil { + bytes[position] = 1 + position++ + permissionsBytes, err := u.Permissions.MarshalBinary() + if err != nil { + return nil, err + } + binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(len(permissionsBytes))) + position += 4 + copy(bytes[position:position+len(permissionsBytes)], permissionsBytes) + } else { + bytes[position] = 0 + } + + return bytes, nil +} diff --git a/foreign/go/contracts/users.go b/foreign/go/contracts/users.go index 3fcc051ce5..4667dfc2cf 100644 --- a/foreign/go/contracts/users.go +++ b/foreign/go/contracts/users.go @@ -17,29 +17,7 @@ package iggcon -type ChangePasswordRequest struct { - UserID Identifier `json:"-"` - CurrentPassword string `json:"CurrentPassword"` - NewPassword string `json:"NewPassword"` -} - -type UpdatePermissionsRequest struct { - UserID Identifier `json:"-"` - Permissions *Permissions `json:"Permissions,omitempty"` -} - -type UpdateUserRequest struct { - UserID Identifier `json:"-"` - Username *string `json:"username"` - Status *UserStatus `json:"userStatus"` -} - -type CreateUserRequest struct { - Username string `json:"username"` - Password string `json:"Password"` - Status UserStatus `json:"Status"` - Permissions *Permissions `json:"Permissions,omitempty"` -} +import "encoding/binary" type UserInfo struct { Id uint32 `json:"Id"` @@ -65,6 +43,100 @@ type Permissions struct { Streams map[int]*StreamPermissions `json:"Streams,omitempty"` } +func (p *Permissions) MarshalBinary() ([]byte, error) { + size := p.Size() + bytes := make([]byte, size) + + bytes[0] = boolToByte(p.Global.ManageServers) + bytes[1] = boolToByte(p.Global.ReadServers) + bytes[2] = boolToByte(p.Global.ManageUsers) + bytes[3] = boolToByte(p.Global.ReadUsers) + bytes[4] = boolToByte(p.Global.ManageStreams) + bytes[5] = boolToByte(p.Global.ReadStreams) + bytes[6] = boolToByte(p.Global.ManageTopics) + bytes[7] = boolToByte(p.Global.ReadTopics) + bytes[8] = boolToByte(p.Global.PollMessages) + bytes[9] = boolToByte(p.Global.SendMessages) + + position := 10 + + if p.Streams != nil { + bytes[position] = byte(1) + position += 1 + + for streamID, stream := range p.Streams { + binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(streamID)) + position += 4 + + bytes[position] = boolToByte(stream.ManageStream) + bytes[position+1] = boolToByte(stream.ReadStream) + bytes[position+2] = boolToByte(stream.ManageTopics) + bytes[position+3] = boolToByte(stream.ReadTopics) + bytes[position+4] = boolToByte(stream.PollMessages) + bytes[position+5] = boolToByte(stream.SendMessages) + position += 6 + + if stream.Topics != nil { + bytes[position] = byte(1) + position += 1 + + for topicID, topic := range stream.Topics { + binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(topicID)) + position += 4 + + bytes[position] = boolToByte(topic.ManageTopic) + bytes[position+1] = boolToByte(topic.ReadTopic) + bytes[position+2] = boolToByte(topic.PollMessages) + bytes[position+3] = boolToByte(topic.SendMessages) + position += 4 + + bytes[position] = byte(0) + position += 1 + } + } else { + bytes[position] = byte(0) + position += 1 + } + } + } else { + bytes[0] = byte(0) + } + + return bytes, nil +} + +func (p *Permissions) Size() int { + size := 10 + + if p.Streams != nil { + size += 1 + + for _, stream := range p.Streams { + size += 4 + size += 6 + size += 1 + + if stream.Topics != nil { + size += 1 + size += len(stream.Topics) * 9 + } else { + size += 1 + } + } + } else { + size += 1 + } + + return size +} + +func boolToByte(b bool) byte { + if b { + return 1 + } + return 0 +} + type GlobalPermissions struct { ManageServers bool `json:"ManageServers"` ReadServers bool `json:"ReadServers"`