diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index ec3c7204fe67b..bd80981d84bda 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -52,7 +52,7 @@ * CRC => Uint32 * Attributes => Int16 * LastOffsetDelta => Int32 // also serves as LastSequenceDelta - * FirstTimestamp => Int64 + * BaseTimestamp => Int64 * MaxTimestamp => Int64 * ProducerId => Int64 * ProducerEpoch => Int16 @@ -82,19 +82,16 @@ * are retained only until either a new sequence number is written by the corresponding producer or the producerId * is expired from lack of activity. * - * There is no similar need to preserve the timestamp from the original batch after compaction. The FirstTimestamp - * field therefore always reflects the timestamp of the first record in the batch. If the batch is empty, the - * FirstTimestamp will be set to -1 (NO_TIMESTAMP). + * There is no similar need to preserve the timestamp from the original batch after compaction. The BaseTimestamp + * field therefore reflects the timestamp of the first record in the batch in most cases. If the batch is empty, the + * BaseTimestamp will be set to -1 (NO_TIMESTAMP). If the delete horizon flag is set to 1, the BaseTimestamp + * will be set to the time at which tombstone records and aborted transaction markers in the batch should be removed. * * Similarly, the MaxTimestamp field reflects the maximum timestamp of the current records if the timestamp type * is CREATE_TIME. For LOG_APPEND_TIME, on the other hand, the MaxTimestamp field reflects the timestamp set * by the broker and is preserved after compaction. Additionally, the MaxTimestamp of an empty batch always retains * the previous value prior to becoming empty. * - * The delete horizon flag for the sixth bit is used to determine if the first timestamp of the batch had been set to - * the time for which tombstones / transaction markers need to be removed. If it is true, then the first timestamp is - * the delete horizon, otherwise, it is merely the first timestamp of the record batch. - * * The current attributes are given below: * * --------------------------------------------------------------------------------------------------------------------------- @@ -288,15 +285,15 @@ private CloseableIterator compressedIterator(BufferSupplier bufferSuppli return new StreamRecordIterator(inputStream) { @Override - protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { - return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, firstTimestamp, baseSequence, logAppendTime); + protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { + return DefaultRecord.readPartiallyFrom(inputStream, skipArray, baseOffset, baseTimestamp, baseSequence, logAppendTime); } }; } else { return new StreamRecordIterator(inputStream) { @Override - protected Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException { - return DefaultRecord.readFrom(inputStream, baseOffset, firstTimestamp, baseSequence, logAppendTime); + protected Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException { + return DefaultRecord.readFrom(inputStream, baseOffset, baseTimestamp, baseSequence, logAppendTime); } }; } @@ -470,7 +467,7 @@ public static void writeHeader(ByteBuffer buffer, byte magic, CompressionType compressionType, TimestampType timestampType, - long firstTimestamp, + long baseTimestamp, long maxTimestamp, long producerId, short epoch, @@ -482,8 +479,8 @@ public static void writeHeader(ByteBuffer buffer, int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) throw new IllegalArgumentException("Invalid magic value " + magic); - if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) - throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); + if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP) + throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp); short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet); @@ -493,7 +490,7 @@ public static void writeHeader(ByteBuffer buffer, buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch); buffer.put(position + MAGIC_OFFSET, magic); buffer.putShort(position + ATTRIBUTES_OFFSET, attributes); - buffer.putLong(position + BASE_TIMESTAMP_OFFSET, firstTimestamp); + buffer.putLong(position + BASE_TIMESTAMP_OFFSET, baseTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); @@ -519,13 +516,13 @@ public static int sizeInBytes(long baseOffset, Iterable records) { return 0; int size = RECORD_BATCH_OVERHEAD; - Long firstTimestamp = null; + Long baseTimestamp = null; while (iterator.hasNext()) { Record record = iterator.next(); int offsetDelta = (int) (record.offset() - baseOffset); - if (firstTimestamp == null) - firstTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - firstTimestamp; + if (baseTimestamp == null) + baseTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - baseTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(), record.headers()); } @@ -539,12 +536,12 @@ public static int sizeInBytes(Iterable records) { int size = RECORD_BATCH_OVERHEAD; int offsetDelta = 0; - Long firstTimestamp = null; + Long baseTimestamp = null; while (iterator.hasNext()) { SimpleRecord record = iterator.next(); - if (firstTimestamp == null) - firstTimestamp = record.timestamp(); - long timestampDelta = record.timestamp() - firstTimestamp; + if (baseTimestamp == null) + baseTimestamp = record.timestamp(); + long timestampDelta = record.timestamp() - baseTimestamp; size += DefaultRecord.sizeInBytes(offsetDelta++, timestampDelta, record.key(), record.value(), record.headers()); } @@ -633,7 +630,7 @@ private abstract class StreamRecordIterator extends RecordIterator { this.inputStream = inputStream; } - abstract Record doReadRecord(long baseOffset, long firstTimestamp, int baseSequence, Long logAppendTime) throws IOException; + abstract Record doReadRecord(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) throws IOException; @Override protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index fc924b0a80723..b5f42e5b915fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -35,7 +35,7 @@ public interface MutableRecordBatch extends RecordBatch { /** * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual * timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated - * by this method, but clients ignore them if the timestamp time is log append time. Note that firstTimestamp is not + * by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not * updated by this method. * * This typically requires re-computation of the batch's CRC. diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index c6520bde445a5..88e59d51ad80e 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -317,7 +317,7 @@ object DumpLogSegments { " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence + " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch + " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: " + batch.isTransactional + - " isControl: " + batch.isControlBatch) + " isControl: " + batch.isControlBatch + " deleteHorizonMs: " + batch.deleteHorizonMs) else print("offset: " + batch.lastOffset) diff --git a/docs/implementation.html b/docs/implementation.html index 5d75ccbdfd684..773d510680be5 100644 --- a/docs/implementation.html +++ b/docs/implementation.html @@ -47,9 +47,10 @@

5.3.1.1 Control Batches

A control batch contains a single record called the control record. Control records should not be passed on to applications. Instead, they are used by consumers to filter out aborted transactional messages.

The key of a control record conforms to the following schema: