KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.#10271
Conversation
2c61011 to
94ac96d
Compare
e7d4b2b to
3bbc84d
Compare
cf55564 to
ba33354
Compare
There was a problem hiding this comment.
This field is not present in the KIP. The KIP contains a field called LeaderEpoch, which is not seen here.
Should we update the KIP suitably?
There was a problem hiding this comment.
Right, we will be updating the KIP with this change.
There was a problem hiding this comment.
This field is not present in the KIP. The KIP contains a field called LeaderEpoch, which is not seen here.
Should we update the KIP suitably?
There was a problem hiding this comment.
Right, we will be updating the KIP with this change.
There was a problem hiding this comment.
This field is not present in the KIP. The KIP contains a field called Epoch, which is not seen here.
Should we update the KIP suitably?
There was a problem hiding this comment.
Right, we will be updating the KIP with this change.
There was a problem hiding this comment.
The ProducerRecord can hold a key and a value. It seems like we could store the API key in ProducerRecord.key() and store the serialized payload in ProducerRecord.value(). Why not take that route instead of serializing to a single byte array here containing (apiKey, version, payload)?
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
This is of course assuming that RemoteLogMetadataContextSerde will only be used for serializing/deserializing events to/from a Kafka topic (as the class doc suggests).
There was a problem hiding this comment.
ProducerRecord key is generally used to route to a partition based on the key or for compaction. But we do not want to route the requests based on the apiKey here and it is not a compacted topic.
apiKey and version are used to unmarshal the payload. IMHO, I see that as a complete structure for the record value and it should be part of the value instead of spreading in record key and value.
There was a problem hiding this comment.
Could we set flexibleVersions to 0+ so that it means optional attributes are supported from initial version?
Same question applies to other message type definitions.
There was a problem hiding this comment.
Good point, I will add that.
There was a problem hiding this comment.
The nesting is getting deeper here, could we ease it out by extracting the code that creates a RemoteLogSegmentIdEntry from a RemoteLogSegmentId into a separate helper function? It seems like we could define it as a public static function in RemoteLogSegmentId and then we will be able to use it here and in RemoteLogSegmentMetadataUpdateSerde.serialize(...).
There was a problem hiding this comment.
+1 to extract them to a different method for better readability, updated in the latest commit.
But RemoteLogSegmentIdEntry is created respectively in RemoteLogSegmentRecord and RemoteogSegmentUpdateRecord and they can not be shared now. Is there a way to create a type and import/include it in message definitions?
There was a problem hiding this comment.
Hmm, why do we need to create and apply an update here?
There was a problem hiding this comment.
We did not expose RemoteLogSegmentMetadata constructor to take state as it is always created with RemoteLogSegmentState.COPY_SEGMENT_STARTED and it should always be updated with RemoteLogSegmentMetadataUpdate.
There was a problem hiding this comment.
Should we rename this function as toBytes() and symmetrically rename the other deserialize() function above as fromBytes()?
There was a problem hiding this comment.
I am fine with changing transformToBytes to toBytes. But I prefer to keep serialize and deserialize as they are the respective implementations of serializer and deserializer.
…d on an internal topic as storage. Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protcocol message format. Added tests for all the events types. This is part of the tiered storaqge implementation KIP-405.
|
@junrao This PR is rebased with the latest trunk and it is ready for review. |
…/services in storage module.
| * | ||
| * You can read more details in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat">KIP-405</a> | ||
| */ | ||
| public class RemoteLogMetadataContext { |
There was a problem hiding this comment.
This class is very similar to ApiMessageAndVersion. Both the metadata and the storage module need to serialize/deserialize ApiMessage. Instead of creating different variants, I am wondering if it would be better to put the common code in a new server-side module or the client module under the server package that can be shared.
There was a problem hiding this comment.
This is slghtly different from ApiMessageAndVersion. This class does not have payload as the Message but it has RemoteLogSegmentMetadata or RemoteLogSegmentMetadataUpdate or RemotePartitionDeleteMetadata. This simplifies for producers/consumers of remote log metadata topic as they expect to send/receive these POJOs but not the Message objects. So, it is simpler for producers/consumers to haveMessage to POJO marshalling/unmarshalling in ser/des.
Refactored RemoteLogMetadataSerdes#serialize to return ByteBuffer instead of Message object
| private final Serializer<RemoteLogMetadataContext> rootSerializer; | ||
|
|
||
| public RemoteLogMetadataContextSerdes() { | ||
| rootSerializer = (topic, data) -> serialize(data); |
There was a problem hiding this comment.
Overall, this seems to be a convoluted way of doing serialization. (1) The logic is kind of hard to follow due to the indirections. (2) The Serde api is really designed for clients. So things like topic, configure() and close() don't really apply here.
I am wondering if we could do the following.
(a) Have a generic ApiMessageSerde that converts between a generic ApiMessageContext (with ApiMessage and version) and bytes. The serializer will write apiKey, version, payload in that order.
(b) Have a remote storage specific RemoteStorageSerde that wraps ApiMessageSerde and converts ApiMessageContext from/to RemoteStroageContext (with remote stroage specific objects).
This way, if we have other usage of serializing ApiMessage into a log, we could reuse (a).
There was a problem hiding this comment.
There are only two layers currently exist for serialization. Even with ApiMessageSerde, there will be at least two layers but the ApiMessageSerde can be reused in future.
I plan to rename RemoteLogMetadataSerde to RemoteLogMetadataTransform
- to avoid confusion with serde methods like seriaize/deserialize.
- as it is really a transform of POJO to APIMessageAndVersion or vice versa.
For ApiMessageSerde, I plan to reuse ApiMessageAndVersion as you suggested earlier. This will be similar to the below code.
/**
* This abstract class provides serialization/deserialization of {@code ApiMessageAndVersion}.
* <p></p>
* Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective
* {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective
* {@code ApiMessage} instance.
*/
public abstract class AbstractApiMessageAndVersionSerde implements Serde<ApiMessageAndVersion> {
private final Serializer<ApiMessageAndVersion> serializer;
private final Deserializer<ApiMessageAndVersion> deserializer;
public AbstractApiMessageAndVersionSerde() {
serializer = (topic, data) -> doSerialize(data);
deserializer = (topic, data) -> doDeserialize(data);
}
private byte[] doSerialize(ApiMessageAndVersion messageAndVersion) {
ObjectSerializationCache cache = new ObjectSerializationCache();
short version = messageAndVersion.version();
ApiMessage message = messageAndVersion.message();
// Add header containing apiKey and apiVersion,
// headerSize is 1 byte for apiKey and 1 byte for apiVersion
int headerSize = 1 + 1;
int messageSize = message.size(cache, version);
ByteBufferAccessor writable = new ByteBufferAccessor(ByteBuffer.allocate(headerSize + messageSize));
// Write apiKey and version
writable.writeUnsignedVarint(message.apiKey());
writable.writeUnsignedVarint(version);
// Write the message
message.write(writable, cache, version);
return writable.buffer().array();
}
private ApiMessageAndVersion doDeserialize(byte[] data) {
ByteBufferAccessor readable = new ByteBufferAccessor(ByteBuffer.wrap(data));
short apiKey = (short) readable.readUnsignedVarint();
short version = (short) readable.readUnsignedVarint();
ApiMessage message = apiMessageFor(apiKey);
message.read(readable, version);
return new ApiMessageAndVersion(message, version);
}
/**
* Return {@code ApiMessage} instance for the given {@code apiKey}. This is used while deserializing the bytes
* payload into the respective {@code ApiMessage} in {@link #doDeserialize(byte[])} method.
*
* @param apiKey apiKey for which a {@code ApiMessage} to be created.
*/
public abstract ApiMessage apiMessageFor(short apiKey);
@Override
public Serializer<ApiMessageAndVersion> serializer() {
return serializer;
}
@Override
public Deserializer<ApiMessageAndVersion> deserializer() {
return deserializer;
}
}
/**
* This interface is about transforming metadata objects into the respective {@link ApiMessageAndVersion} or vice versa.
* <p></p>
* Those metadata objects can be {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata},
* {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate}, or {@link org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata}.
* <p>
* @param <T> metadata type.
*
* @see RemoteLogSegmentMetadataTransform
* @see RemoteLogSegmentMetadataUpdateTransform
* @see RemotePartitionDeleteMetadataTransform
*/
public interface RemoteLogMetadataTransform<T> {
/**
* Transforms the given {@code metadata} object into the respective {@code ApiMessageAndVersion} object.
*
* @param metadata metadata object to be serialized.
* @return transformed {@code ApiMessageAndVersion} object.
*/
ApiMessageAndVersion toApiMessageAndVersion(T metadata);
/**
* Return the deserialized object for the given payload and the version.
*
* @param apiMessageAndVersion ApiMessageAndVersion object to be transformed.
* @return transformed {@code T} metadata object.
*/
T fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion);
}
There was a problem hiding this comment.
Is there any benefit to use Serde? It's intended for the clients to plug-in a topic specific Serde. Here, we are creating a generic Serde for ApiMessage independent of the topics. So using Serde doesn't seem the be very natural here.
If we don't use Serde, AbstractApiMessageAndVersionSerde could just implement a serialize() and deserialize() method without topic directly, which avoids a level of indirection.
There was a problem hiding this comment.
Serde deserializer can be used by devs to build any tools that they need based on remote log metadata topic by setting it as value deserializer for consumer. But they can also use byte array deserializer on consumer and invoke RemoteLogContextDeserializer for each message to convert from byte array to RemoteLogContext. I updated the PR with the suggested changes.
There was a problem hiding this comment.
@satishd : What you said makes sense. It's useful to be able to use the same Serde in tools or client applications. Also, it seems that it's possible to make a generic Serde for all ApiMessage since the apiKey is unique within a topic and the topic is passed into Serde.
There was a problem hiding this comment.
I will remove this dependency once I move ApiMessageAndVersion to clients module. This is done in a followup PR #10271.
…updated the tests to check the same. Minor cleanups.
| * | ||
| * @param apiKey apiKey for which a {@code ApiMessage} to be created. | ||
| */ | ||
| public abstract ApiMessage apiMessageFor(short apiKey); |
There was a problem hiding this comment.
Another option is for all ApiMessageAndVersion that need to be stored in a log to share the same ID space. Then, we don't need to have this method.
There was a problem hiding this comment.
Are you suggesting that all the metadata log related modules share the same id space? For ex: RAFT metadata, remote log metadata will share the same ID space. IMHO, we should not go that way as it looks like an abstraction leak to me.
We can have a uber serializer/deserializer at a tool level that uses the underlying serdes based on the respective metadata topic. We can introduce this serializer/deserializer in the tools module.
There was a problem hiding this comment.
One potential benefit of shared id space is that an ApiMessage could be used in more than one topic. There is no such use case now. I am wondering if there is such a use case in the future. If there is no such usage, we could choose not to share the id space.
There was a problem hiding this comment.
I guess it very unlikely to have such a use case in the future.
If we encounter such a use case in the future, we can have a workaround to define a message in another topic and create a converter for one message definition to the other.
AbstractApiMessageSerde is used as the underlying serialization mechanism for remote log metadata topic. Renamed `AbstractMetadataRecordSerde` to `AbstractApiMessageSerde`. Renamed `RemoteLogMetadataContextSerde` to `RemoteLogMetadataSerde`. Extracted `RemoteLogMetadata` a base class from RemoteLogSegmentMetadata etc.
|
@junrao: I will have a followup PR for moving |
| private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey(); | ||
| private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); | ||
|
|
||
| private static final Map<String, Short> REMOTE_LOG_STORAGE_CLASS_TO_API_KEY = createRemoteLogStorageClassToApiKeyMap(); |
There was a problem hiding this comment.
Perhaps we could just use Class as the key instead of class name?
There was a problem hiding this comment.
I initially had Class as a key but later I changed it to class name viz String. Class instances may not be equal if they are not loaded from the same classloader. String(Class name) was taken as the key to avoid such remote possible scenarios in the future.
| * This class provides serialization and deserialization for {@link RemoteLogMetadata}. This is the root serde | ||
| * for the messages that are stored in internal remote log metadata topic. | ||
| */ | ||
| public class RemoteLogMetadataSerde { |
There was a problem hiding this comment.
Since we just need a single instance of this class, perhaps we could have a static public method to return the singleton and make the public constructor private?
There was a problem hiding this comment.
I prefer avoiding singletons if possible. I have changed this class to be open and extensible. :) This can be useful in the future if we need to extend this for tests or other purposes.
Changed remote log metadata message definition types as metadata and enabled metadat record generation. `RemoteLogMetadataSerde` is made open for extension. Cleanedup renamed/removed class references.
| byte[] metadataBytes = serde.serialize(remoteLogMetadata); | ||
|
|
||
| // Deserialize the bytes and check the RemoteLogMetadata object is as expected. | ||
| RemoteLogMetadataSerde deserializer = new RemoteLogMetadataSerde(); |
There was a problem hiding this comment.
Do we need to instantiate the Serde a second time?
There was a problem hiding this comment.
Created another RemoteLogMetadataSerde instance to depict the real usecase of serializer and deserializer having their own instances. Added a comment on the same.
Fixed checkstyle in raft module Added a comment in `RemoteLogMetadataSerdeTest`.
|
Thanks @junrao for the comments. Fixed checkstyle in raft module and added a comment in |
| ", startOffset=" + startOffset + | ||
| ", endOffset=" + endOffset + | ||
| ", brokerId=" + brokerId + | ||
| ", brokerId=" + brokerId() + |
There was a problem hiding this comment.
Perhaps it is worth considering using StringBuilder here given that we are doing a number of string concatenations (~10) now.
https://docs.oracle.com/javase/7/docs/api/java/lang/StringBuilder.html
There was a problem hiding this comment.
afaik, javac(JDK8+) already converts that into StringBuilder and it is not required to replace + to StringBuilder concatenation code here.
Actually, we should avoid doing that as javac has better optimizations like JEP-280.
There was a problem hiding this comment.
Interesting, there are tons of references to StringBuilder in the AK code base today: https://github.com/apache/kafka/search?p=1&q=StringBuilder. Are you saying most of these are overkill?
| } | ||
|
|
||
| public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { | ||
| Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); |
There was a problem hiding this comment.
This 2-way map lookup feels a bit complex to me. It appears here the requirement is that you need the apiKey corresponding to the provided RemoteLogMetadata. To make it simpler, why not provide a short apiKey() abstract method in the RemoteLogMetadata base class and then ask the specializing classes to implement it? You will use then use the remoteLogMetadata.apiKey() method to get the apiKey here. This will avoid the need to maintain 2 maps within this class, you will only need the remoteLogMetadataTransform map.
There was a problem hiding this comment.
apiKey is an internal serialization detail of one of the implementations. We should not leak that to public API like RemoteLogMetadata and its subclasses.
There was a problem hiding this comment.
I guess we can continue discussion here: #10271 (comment) ?
| * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective | ||
| * {@code ApiMessage} instance. | ||
| */ | ||
| public abstract class BytesApiMessageSerde { |
There was a problem hiding this comment.
Why do we need this functionality in a separate class? Currently the only user of this class seems to be RemoteLogMetadataSerde. Instead if we just merge the implementation of this class into RemoteLogMetadataSerde, it looks simpler to me.
There was a problem hiding this comment.
BytesApiMessageSerde is a generic implementation that can be used by others. As I said in my earlier comment it will be moved to clients module.
There was a problem hiding this comment.
@satishd I'm a little confused, where do you foresee this being reused by others i.e. other than RemoteLogMetadataSerde? Do you have any use cases in mind? I see that it is useful to keep it generic, but without a few clear use cases it is coming at the exchange of added complexity.
There was a problem hiding this comment.
I think the idea is that BytesApiMessageSerde could be reused for any future implementation that wants to serialize ApiMessage into an internal topic. AbstractApiMessageSerde allows the users to have an optimized implementation of Writable, for example, reusing a preallocated ByteBuffer (which the Raft layer uses). However, for most use cases, BytesApiMessageSerde is probably good enough and easier to use.
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> { |
There was a problem hiding this comment.
Does this class have unit test coverage?
There was a problem hiding this comment.
They were indirectly covered through RemoteLogMetadataSerdeTest but I added individual unit tests in the latest commit 46e0597
There was a problem hiding this comment.
Sounds good, I'll take a look.
| import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; | ||
| import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; | ||
|
|
||
| public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataUpdate> { |
There was a problem hiding this comment.
Does this class have unit test coverage?
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> { |
There was a problem hiding this comment.
Instead of introducing this separate class, why not have RemoteLogSegmentMetadata implement the RemoteLogMetadataTransform< RemoteLogSegmentMetadata> interface directly?
There was a problem hiding this comment.
RemoteLogSegmentMetadata is a public API and we do not want to add any specific serialization implementation details into that.
There was a problem hiding this comment.
@satishd interesting, so are you saying RemoteLogSegmentMetadata being public means it could be used outside the Broker binary? If so, where?
There was a problem hiding this comment.
I think what Satish means is that RemoteLogSegmentMetadata will be used in any remote storage plugin implementation. So, we want to be a bit careful about the public methods exposed there. Public facing methods are harder to evolve and we don't want to expose unnecessary public methods.
Added RemoteLogMetadataTransformTest with unit tests for RemoteLogSegmentMetadataTransform, RemoteLogSegmentMetadataUpdateTransform, and RemotePartitionDeleteMetadataTransform.
|
Thanks @kowshik for your comments. Addressed with replies, updated the PR with commit #10271 (comment) |
| */ | ||
| private final long eventTimestampMs; | ||
|
|
||
| public RemoteLogMetadata(int brokerId, long eventTimestampMs) { |
There was a problem hiding this comment.
should this be declared protected since this is an abstract class anyway?
| record.maxTimestampMs(), record.brokerId(), | ||
| record.eventTimestampMs(), record.segmentSizeInBytes(), | ||
| segmentLeaderEpochs); | ||
| RemoteLogSegmentMetadataUpdate rlsmUpdate |
There was a problem hiding this comment.
Maybe I'm missing something, but there seems to be a RemoteLogSegmentMetadata constructor overload that also allows you to pass the RemoteLogSegmentState as one of the parameters: https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java#L99
If we use the overloaded version, do we still need to apply the RemoteLogSegmentMetadataUpdate here?
There was a problem hiding this comment.
That constructor is kept private and wanted to control what fields can be changed on the existing instance with RemoteLogSegmentMetadataUpdate.
Changed RemoteLogMetadata constructor access from public to protected as it is an abstract class.
| * | ||
| * @param apiKey apiKey for which a {@code ApiMessage} to be created. | ||
| */ | ||
| public abstract ApiMessage apiMessageFor(short apiKey); |
There was a problem hiding this comment.
Hello @satishd , why should we make this abstract? I find that the two implementations of this method are all MetadataRecordType.fromId(apiKey).newMetadataRecord().
There was a problem hiding this comment.
@dengziming Below are the two implementations of AbstractApiMessageSerde and they do not share same records types.
-
BytesApiMessageSerde
BytesApiMessageSerde.this.apiMessageFor(apiKey); -
MetadataRecordSerde
MetadataRecordType.fromId(apiKey).newMetadataRecord();
…d on an internal topic as storage. (apache#10271) KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format. Added tests for all the event types for that topic. This is part of the tiered storaqe implementation KIP-405. Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
…d on an internal topic as storage. (apache#10271) KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format. Added tests for all the event types for that topic. This is part of the tiered storaqe implementation KIP-405. Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
…d on an internal topic as storage. (apache#10271) Summary: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format. Added tests for all the event types for that topic. This is part of the tiered storaqe implementation KIP-405. apache-reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com> (cherry picked from commit a1367f5) Reviewers: #ldap_kafka_admins, kchandraprakash Reviewed By: #ldap_kafka_admins, kchandraprakash JIRA Issues: DKAFC-868 Differential Revision: https://code.uberinternal.com/D6303225
…d on an internal topic as storage. (apache#10271) Summary: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format. Added tests for all the event types for that topic. This is part of the tiered storaqe implementation KIP-405. apache-reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com> (cherry picked from commit a1367f5) Reviewers: #ldap_kafka_admins, kchandraprakash Reviewed By: #ldap_kafka_admins, kchandraprakash JIRA Issues: DKAFC-868 Differential Revision: https://code.uberinternal.com/D6303225
KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format.
Added tests for all the event types for that topic.
This is part of the tiered storaqe implementation KIP-405.
Committer Checklist (excluded from commit message)