Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f50a0f5
[KAFKA-8522] Implementing proposal as outlined in KIP-534
ConcurrencyPractitioner Oct 27, 2019
0886382
Adding comments
ConcurrencyPractitioner Nov 10, 2019
bff17b6
Adding mechanism to modify first timestamp
ConcurrencyPractitioner Nov 11, 2019
4d6b5a1
Cleaning things up
ConcurrencyPractitioner Nov 16, 2019
aac26a7
Fixing checkstyle
ConcurrencyPractitioner Nov 16, 2019
f97478d
Fixing funny logic
ConcurrencyPractitioner Nov 19, 2019
8250b90
Fixing minor test
ConcurrencyPractitioner Nov 21, 2019
b300152
Addressing most comments
ConcurrencyPractitioner Dec 2, 2019
7301fa9
Fixing all comments
ConcurrencyPractitioner Dec 3, 2019
e209c4b
Addressing most comments
ConcurrencyPractitioner Dec 6, 2019
50ec279
Attenpting zome fixes
ConcurrencyPractitioner Dec 8, 2019
6ae485c
checkstyle
ConcurrencyPractitioner Dec 8, 2019
c18ff6d
Fixing some tests
ConcurrencyPractitioner Dec 25, 2019
7ed83bf
Fixing most failed tests
ConcurrencyPractitioner Dec 27, 2019
d32bcb5
Modifying variable arg
ConcurrencyPractitioner Dec 27, 2019
6c1425c
Modifying variable arg
ConcurrencyPractitioner Dec 27, 2019
8b2d2b1
Fixing stuff
ConcurrencyPractitioner Dec 27, 2019
649e924
Removing some info statements
ConcurrencyPractitioner Dec 28, 2019
fdfa141
Fixing wack structure
ConcurrencyPractitioner Dec 28, 2019
0b9c49c
Fixing last remaining major issue
ConcurrencyPractitioner Dec 28, 2019
1929e7e
Changing up stuff
ConcurrencyPractitioner Dec 28, 2019
43fe252
Adding test
ConcurrencyPractitioner Dec 30, 2019
4138885
Adding some comments and one more test
ConcurrencyPractitioner Dec 31, 2019
f43804f
Adding fixed integration test
ConcurrencyPractitioner Dec 31, 2019
edfcb7d
Wrapping up test
ConcurrencyPractitioner Dec 31, 2019
e07e5f8
Putting up correct test
ConcurrencyPractitioner Jan 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
<module name="MethodLength"/>
<module name="ParameterNumber">
<!-- default is 8 -->
<property name="max" value="13"/>
<property name="max" value="14"/>
</module>
<module name="ClassDataAbstractionCoupling">
<!-- default is 7 -->
Expand All @@ -124,7 +124,7 @@
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->
<property name="max" value="16"/>
<property name="max" value="17"/>
</module>
<module name="JavaNCSS">
<!-- default is 50 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,16 @@ public LegacyRecord outerRecord() {
return record;
}

@Override
public long deleteHorizonMs() {
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o)
Expand Down Expand Up @@ -468,6 +478,16 @@ public long offset() {
return buffer.getLong(OFFSET_OFFSET);
}

@Override
public long deleteHorizonMs() {
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {
return false;
}

@Override
public LegacyRecord outerRecord() {
return record;
Expand Down Expand Up @@ -557,6 +577,16 @@ public long baseOffset() {
return loadFullBatch().baseOffset();
}

@Override
public long deleteHorizonMs() {
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean deleteHorizonSet() {
return false;
}

@Override
public long lastOffset() {
return offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@
* by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains
* the previous value prior to becoming empty.
*
* The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to
* the time for which tombstones / transaction markers needs to be removed. If it is true, then the first timestamp is
* the delete horizon, otherwise, it is merely the first timestamp of the record batch.
*
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
* | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* -------------------------------------------------------------------------------------------------
* ---------------------------------------------------------------------------------------------------------------------------
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some doc on how Delete Horizon flag is being used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* | Unused (7-15) | Delete Horizon Flag (6) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) |
* ---------------------------------------------------------------------------------------------------------------------------
*/
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
static final int BASE_OFFSET_OFFSET = 0;
Expand Down Expand Up @@ -128,6 +132,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
private static final byte COMPRESSION_CODEC_MASK = 0x07;
private static final byte TRANSACTIONAL_FLAG_MASK = 0x10;
private static final int CONTROL_FLAG_MASK = 0x20;
private static final byte DELETE_HORIZON_FLAG_MASK = 0x40;
private static final byte TIMESTAMP_TYPE_MASK = 0x08;

private static final int MAX_SKIP_BUFFER_SIZE = 2048;
Expand Down Expand Up @@ -155,10 +160,15 @@ public void ensureValid() {
}

/**
* Get the timestamp of the first record in this batch. It is always the create time of the record even if the
* Get the timestamp of the first record in this batch. It is usually the create time of the record even if the
* timestamp type of the batch is log append time.
*
* @return The first timestamp or {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
* There is the possibility that the first timestamp had been set to the delete horizon of the batch,
* in which case, the delete horizon will be returned instead.
*
* @return The first timestamp if the batch's delete horizon has not been set
* The delete horizon if the batch's delete horizon has been set
* {@link RecordBatch#NO_TIMESTAMP} if the batch is empty
*/
public long firstTimestamp() {
return buffer.getLong(FIRST_TIMESTAMP_OFFSET);
Expand Down Expand Up @@ -245,6 +255,18 @@ public boolean isTransactional() {
return (attributes() & TRANSACTIONAL_FLAG_MASK) > 0;
}

@Override
public boolean deleteHorizonSet() {
return (attributes() & DELETE_HORIZON_FLAG_MASK) > 0;
}

@Override
public long deleteHorizonMs() {
if (deleteHorizonSet())
return firstTimestamp();
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean isControlBatch() {
return (attributes() & CONTROL_FLAG_MASK) > 0;
Expand Down Expand Up @@ -360,7 +382,7 @@ public void setMaxTimestamp(TimestampType timestampType, long maxTimestamp) {
if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp)
return;

byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch());
byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch(), deleteHorizonSet());
buffer.putShort(ATTRIBUTES_OFFSET, attributes);
buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp);
long crc = computeChecksum();
Expand Down Expand Up @@ -407,7 +429,7 @@ public int hashCode() {
}

private static byte computeAttributes(CompressionType type, TimestampType timestampType,
boolean isTransactional, boolean isControl) {
boolean isTransactional, boolean isControl, boolean isDeleteHorizonSet) {
if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " +
"format v2 and above");
Expand All @@ -419,6 +441,8 @@ private static byte computeAttributes(CompressionType type, TimestampType timest
attributes |= COMPRESSION_CODEC_MASK & type.id;
if (timestampType == TimestampType.LOG_APPEND_TIME)
attributes |= TIMESTAMP_TYPE_MASK;
if (isDeleteHorizonSet)
attributes |= DELETE_HORIZON_FLAG_MASK;
return attributes;
}

Expand All @@ -435,9 +459,49 @@ public static void writeEmptyHeader(ByteBuffer buffer,
boolean isTransactional,
boolean isControlRecord) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, false, partitionLeaderEpoch, 0);
}

public static void writeEmptyHeader(ByteBuffer buffer,
byte magic,
long producerId,
short producerEpoch,
int baseSequence,
long baseOffset,
long lastOffset,
int partitionLeaderEpoch,
TimestampType timestampType,
long timestamp,
boolean isTransactional,
boolean isControlRecord,
boolean isDeleteHorizonSet) {
int offsetDelta = (int) (lastOffset - baseOffset);
writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId,
producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0);
producerEpoch, baseSequence, isTransactional, isControlRecord, isDeleteHorizonSet, partitionLeaderEpoch, 0);
}

static void writeHeader(ByteBuffer buffer,
long baseOffset,
int lastOffsetDelta,
int sizeInBytes,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long firstTimestamp,
long maxTimestamp,
long producerId,
short epoch,
int sequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int numRecords) {
writeHeader(buffer, baseOffset, lastOffsetDelta, sizeInBytes, magic, compressionType,
timestampType, firstTimestamp, maxTimestamp, producerId, epoch, sequence,
isTransactional, isControlBatch, false, partitionLeaderEpoch, numRecords);
}

static void writeHeader(ByteBuffer buffer,
Expand All @@ -454,14 +518,15 @@ static void writeHeader(ByteBuffer buffer,
int sequence,
boolean isTransactional,
boolean isControlBatch,
boolean isDeleteHorizonSet,
int partitionLeaderEpoch,
int numRecords) {
if (magic < RecordBatch.CURRENT_MAGIC_VALUE)
throw new IllegalArgumentException("Invalid magic value " + magic);
if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp);

short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch);
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);

int position = buffer.position();
buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset);
Expand Down Expand Up @@ -699,6 +764,18 @@ public boolean isTransactional() {
return loadBatchHeader().isTransactional();
}

@Override
public boolean deleteHorizonSet() {
return loadBatchHeader().deleteHorizonSet();
}

@Override
public long deleteHorizonMs() {
if (deleteHorizonSet())
return super.loadBatchHeader().deleteHorizonMs();
return RecordBatch.NO_TIMESTAMP;
}

@Override
public boolean isControlBatch() {
return loadBatchHeader().isControlBatch();
Expand Down
Loading