Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void writeUnsignedVarint(int i) {

@Override
public void writeByteBuffer(ByteBuffer src) {
buf.put(src);
buf.put(src.duplicate());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,20 @@ public interface Message {
* If the specified version is too new to be supported
* by this software.
*/
int size(ObjectSerializationCache cache, short version);
default int size(ObjectSerializationCache cache, short version) {
MessageSizeAccumulator size = new MessageSizeAccumulator();
addSize(size, cache, version);
return size.totalSize();
}

/**
* Add the size of this message to an accumulator.
*
* @param size The size accumulator to add to
* @param cache The serialization size cache to populate.
* @param version The version to use.
*/
void addSize(MessageSizeAccumulator size, ObjectSerializationCache cache, short version);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about how to simplify this process.

Could we reuse the method void write(Writable writable, ObjectSerializationCache cache, short version) ? Maybe we can create a Writable instance but it does not write data to any output. Instead, it calculate the size of buffer according to input data.


/**
* Writes out this message to the given Writable.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;

/**
* Helper class which facilitates zero-copy network transmission. See {@link SendBuilder}.
*/
public class MessageSizeAccumulator {
private int totalSize = 0;
private int zeroCopySize = 0;

/**
* Get the total size of the message.
*
* @return total size in bytes
*/
public int totalSize() {
return totalSize;
}

/**
* Get the total "zero-copy" size of the message. This is the summed
* total of all fields which have either have a type of 'bytes' with
* 'zeroCopy' enabled, or a type of 'records'
*
* @return total size of zero-copy data in the message
*/
public int zeroCopySize() {
return zeroCopySize;
}

public void addZeroCopyBytes(int size) {
zeroCopySize += size;
totalSize += size;
}

public void addBytes(int size) {
totalSize += size;
}

public void add(MessageSizeAccumulator size) {
this.totalSize += size.totalSize;
this.zeroCopySize += size.zeroCopySize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ public ObjectSerializationCache() {
this.map = new IdentityHashMap<>();
}

public void setArraySizeInBytes(Object o, int size) {
map.put(o, Integer.valueOf(size));
public void setArraySizeInBytes(Object o, Integer size) {
map.put(o, size);
}

public int getArraySizeInBytes(Object o) {
Object value = map.get(o);
Integer sizeInBytes = (Integer) value;
return sizeInBytes;
public Integer getArraySizeInBytes(Object o) {
return (Integer) map.get(o);
}

public void cacheSerializedValue(Object o, byte[] val) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.kafka.common.protocol;

import org.apache.kafka.common.protocol.types.RawTaggedField;

import org.apache.kafka.common.UUID;
import org.apache.kafka.common.protocol.types.RawTaggedField;
import org.apache.kafka.common.record.MemoryRecords;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -54,6 +54,16 @@ default List<RawTaggedField> readUnknownTaggedField(List<RawTaggedField> unknown
return unknowns;
}

default MemoryRecords readRecords(int length) {
if (length < 0) {
// no records
return null;
} else {
ByteBuffer recordsBuffer = readByteBuffer(length);
return MemoryRecords.readableRecords(recordsBuffer);
}
}

/**
* Read a UUID with the most significant digits first.
*/
Expand Down

This file was deleted.

This file was deleted.

Loading