From 2f9f0ce69c734235c762c5689d347daf5d3f9636 Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Wed, 19 Jan 2022 15:59:31 -0800 Subject: [PATCH 1/3] update DumpLogSegments tool and RecordBatch documentation --- core/src/main/scala/kafka/tools/DumpLogSegments.scala | 2 +- docs/implementation.html | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index c6520bde445a5..88e59d51ad80e 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -317,7 +317,7 @@ object DumpLogSegments { " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence + " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch + " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional + - " isControl: " + batch.isControlBatch) + " isControl: " + batch.isControlBatch + " deleteHorizonMs: " + batch.deleteHorizonMs) else print("offset: " + batch.lastOffset) diff --git a/docs/implementation.html b/docs/implementation.html index 5d75ccbdfd684..756884c2a87bc 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -47,9 +47,10 @@

5.3.1.1 Control Batches

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

The key of a control record conforms to the following schema:

From ad4232259c1e093d6596030649b123fe64a7156e Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Fri, 21 Jan 2022 17:03:11 -0800 Subject: [PATCH 2/3] replace mentions of firstTimestamp with baseTimestamp --- .../common/record/DefaultRecordBatch.java | 40 +++++++++---------- .../common/record/MutableRecordBatch.java | 2 +- docs/implementation.html | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ec3c7204fe67b..67cc1c33330db 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -52,7 +52,7 @@ * CRC => Uint32 * Attributes => Int16 * LastOffsetDelta => Int32 // also serves as LastSequenceDelta - * FirstTimestamp => Int64 + * BaseTimestamp => Int64 * MaxTimestamp => Int64 * ProducerId => Int64 * ProducerEpoch => Int16 @@ -82,9 +82,9 @@ * are retained only until either a new sequence number is written by the corresponding producer or the producerId * is expired from lack of activity. * - * There is no similar need to preserve the timestamp from the original batch after compaction. The FirstTimestamp + * There is no similar need to preserve the timestamp from the original batch after compaction. The BaseTimestamp * field therefore always reflects the timestamp of the first record in the batch. If the batch is empty, the - * FirstTimestamp will be set to -1 (NO_TIMESTAMP). + * BaseTimestamp will be set to -1 (NO_TIMESTAMP). * * Similarly, the MaxTimestamp field reflects the maximum timestamp of the current records if the timestamp type * is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp field reflects the timestamp set @@ -288,15 +288,15 @@ private CloseableIterator compressedIterator(BufferSupplier bufferSuppli return new StreamRecordIterator(inputStream) { @Override - protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { - return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime); + protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { + return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, baseTimestamp, baseSequence, logAppendTime); } }; } else { return new StreamRecordIterator(inputStream) { @Override - protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { - return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime); + protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { + return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime); } }; } @@ -470,7 +470,7 @@ public static void writeHeader(ByteBuffer buffer, byte magic, CompressionType compressionType, TimestampType timestampType, - long firstTimestamp, + long baseTimestamp, long maxTimestamp, long producerId, short epoch, @@ -482,8 +482,8 @@ public static void writeHeader(ByteBuffer buffer, int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) throw new IllegalArgumentException("Invalid magic value " + magic); - if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) - throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); + if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP) + throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp); short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet); @@ -493,7 +493,7 @@ public static void writeHeader(ByteBuffer buffer, buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch); buffer.put(position + MAGIC_OFFSET, magic); buffer.putShort(position + ATTRIBUTES_OFFSET, attributes); - buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp); + buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); @@ -519,13 +519,13 @@ public static int sizeInBytes(long baseOffset, Iterable records) { return 0; int size = RECORD_BATCH_OVERHEAD; - Long firstTimestamp = null; + Long baseTimestamp = null; while (iterator.hasNext()) { Record record = iterator.next(); int offsetDelta = (int) (record.offset() - baseOffset); - if (firstTimestamp == null) - firstTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - firstTimestamp; + if (baseTimestamp == null) + baseTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - baseTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(), record.headers()); } @@ -539,12 +539,12 @@ public static int sizeInBytes(Iterable records) { int size = RECORD_BATCH_OVERHEAD; int offsetDelta = 0; - Long firstTimestamp = null; + Long baseTimestamp = null; while (iterator.hasNext()) { SimpleRecord record = iterator.next(); - if (firstTimestamp == null) - firstTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - firstTimestamp; + if (baseTimestamp == null) + baseTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - baseTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(), record.headers()); } @@ -633,7 +633,7 @@ private abstract class StreamRecordIterator extends RecordIterator { this.inputStream = inputStream; } - abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException; + abstract Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException; @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index fc924b0a80723..b5f42e5b915fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -35,7 +35,7 @@ public interface MutableRecordBatch extends RecordBatch { /** * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual * timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated - * by this method, but clients ignore them if the timestamp time is log append time. Note that firstTimestamp is not + * by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not * updated by this method. * * This typically requires re-computation of the batch's CRC. diff --git a/docs/implementation.html b/docs/implementation.html index 756884c2a87bc..773d510680be5 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -66,7 +66,7 @@