Skip to content

KAFKA-8335; Clean empty batches when sequence numbers are reused#6715

Merged
hachikuji merged 1 commit intoapache:trunkfrom
hachikuji:KAFKA-8335
May 13, 2019
Merged

KAFKA-8335; Clean empty batches when sequence numbers are reused#6715
hachikuji merged 1 commit intoapache:trunkfrom
hachikuji:KAFKA-8335

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:

  1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
  2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.

The complete fix for the second issue is probably to do the proper sequence number bookkeeping in the coordinator. For now, I have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a question about the second condition.


try {
cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize,
transactionMetadata, log.activeProducersWithLastSequence, stats)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this activeProducersWithLastSequence function now right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still used pretty heavily in test cases.

lastRecordsOfActiveProducers.get(batch.producerId).exists { lastRecord =>
lastRecord.lastDataOffset match {
case Some(offset) => batch.lastOffset == offset
case None => batch.isControlBatch && batch.producerEpoch == lastRecord.producerEpoch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just being paranoid here: a txn marker should always follow some data (from the same producer-id) on the partition -- i.e. if there is no data produced to this partition, there should be no txn marker.

With that, I'm not sure if the second condition should really happen or not: a producer has never produced any data, hence has no latest offsets, but still have a control batch. Even with compaction, the producer's latest offset should still be preserved right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A producer can abort after adding the partition to the transaction but before sending any data. In this case, there could be markers without any data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, makes sense.

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

@hachikuji hachikuji merged commit d798dbf into apache:trunk May 13, 2019
hachikuji added a commit that referenced this pull request May 13, 2019
The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:

1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.

The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number. 

Reviewers: Guozhang Wang <wangguoz@gmail.com>
hachikuji added a commit that referenced this pull request May 13, 2019
The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:

1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.

The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number. 

Reviewers: Guozhang Wang <wangguoz@gmail.com>
hachikuji added a commit that referenced this pull request May 13, 2019
The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:

1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.

The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number. 

Reviewers: Guozhang Wang <wangguoz@gmail.com>
omkreddy added a commit to confluentinc/kafka that referenced this pull request May 13, 2019
…es-14-May

* AK_REPO/trunk: (24 commits)
  KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (apache#6009)
  KAFKA-8335; Clean empty batches when sequence numbers are reused (apache#6715)
  KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (apache#6645)
  KAFKA-6521: Use timestamped stores for KTables (apache#6667)
  [MINOR] Consolidate in-memory/rocksdb unit tests for window & session store (apache#6677)
  MINOR: Include StickyAssignor in system tests (apache#5223)
  KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (apache#5918)
  MINOR: Align KTableAgg and KTableReduce (apache#6712)
  MINOR: Fix code section formatting in TROGDOR.md (apache#6720)
  MINOR: Remove unnecessary OptionParser#accepts method call from PreferredReplicaLeaderElectionCommand (apache#6710)
  KAFKA-8352 : Fix Connect System test failure 404 Not Found (apache#6713)
  KAFKA-8348: Fix KafkaStreams JavaDocs (apache#6707)
  MINOR: Add missing option for running vagrant-up.sh with AWS to vagrant/README.md
  KAFKA-8344; Fix vagrant-up.sh to work with AWS properly
  MINOR: docs typo in '--zookeeper myhost:2181--execute'
  MINOR: Remove header and key/value converter config value logging (apache#6660)
  KAFKA-8231: Expansion of ConnectClusterState interface (apache#6584)
  KAFKA-8324: Add close() method to RocksDBConfigSetter (apache#6697)
  KAFKA-6789; Handle retriable group errors in AdminClient API (apache#5578)
  KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
  ...
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the PR. LGTM too.

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#6715)

The log cleaner attempts to preserve the last entry for each producerId in order to ensure that sequence/epoch state is not lost. The current validation checks only the last sequence number for each producerId in order to decide whether a batch should be retained. There are two problems with this:

1. Sequence numbers are not unique alone. It is the tuple of sequence number and epoch which is uniquely defined.
2. The group coordinator always writes batches beginning with sequence number 0, which means there could be many batches which have the same sequence number.

The complete fix for the second issue would probably add proper sequence number bookkeeping in the coordinator. For now, we have left the coordinator implementation unchanged and changed the cleaner logic to use the last offset written by a producer instead of the last sequence number. 

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants