Skip to content

KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)#6009

Merged
jjkoshy merged 4 commits intoapache:trunkfrom
xiowu0:maxlogcompactlag
May 13, 2019
Merged

KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)#6009
jjkoshy merged 4 commits intoapache:trunkfrom
xiowu0:maxlogcompactlag

Conversation

@xiowu0
Copy link
Copy Markdown
Contributor

@xiowu0 xiowu0 commented Dec 6, 2018

Implement the change described in KIP-354
Added unit tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@xiowu0
Copy link
Copy Markdown
Contributor Author

xiowu0 commented Jan 14, 2019

Does anyone have time to review the code? @cmccabe @jjkoshy @lindong28

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Copy link
Copy Markdown
Contributor

@jjkoshy jjkoshy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @xiowu0

Comment thread core/src/main/scala/kafka/log/LogConfig.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaConfig.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogSegment.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogCleanerManager.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogCleanerManager.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogCleanerManager.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but I think we should try and avoid passing in a new preCleanStats param with default.
E.g., we could separate out updating max compaction delay (i.e., separate function from this) and all it does is update the stat; alternately just have a volatile maxCompactionDelay member and update that from this method. A minor disadvantage of a snapshot stats object is that it is necessary to drive its progress from the cleaner thread - for bulk stats such as cleaner stats it is okay. For a metric that you might rely on for alerting, its true value would be delayed by up to log.cleaner.backoff in low volume scenarios.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since grabFilthiestCompactedLog and cleaner stats are defined in two classes. Volatile global variables doesn't make our life easier. The default "new PreCleanStats()" is mainly for the purposes not to change many test cases that use this function directly.
In terms of delay, the max delay can only be safely populated in log cleaner thread, and it reflects the correct view when the delay is calculated. So the next update of maxdelay might be delayed by the backoff and the time spent in the actual compaction.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still dislike the "step"-effect that this has. i.e., from the point in time any log is due for compaction, the maxCompactionDelay metric should be increasing with time. This is minor in the sense that you will record it the next time the cleaner gets around to computing it. I think this can be addressed in a follow-up.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogSegment.scala Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be useful to log a count of how many logs are unclean and of those, how many logs are cleanable due to violating the max compaction lag constraint.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added related logs : see PreCleanStats

@jjkoshy
Copy link
Copy Markdown
Contributor

jjkoshy commented Apr 23, 2019

Sorry I lost track of this - as of my last review I'm +1 but will look over it again in the next couple of days before checking in. cc @cmccabe since I heard he was interested in taking a look as well.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented May 7, 2019

@jjkoshy The feature freeze is in 1 week so you should hurry if you want this in the next release. :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still dislike the "step"-effect that this has. i.e., from the point in time any log is due for compaction, the maxCompactionDelay metric should be increasing with time. This is minor in the sense that you will record it the next time the cleaner gets around to computing it. I think this can be addressed in a follow-up.

Comment thread core/src/main/scala/kafka/log/LogCleanerManager.scala Outdated
Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
Comment thread core/src/main/scala/kafka/log/LogSegment.scala Outdated
@xiowu0 xiowu0 force-pushed the maxlogcompactlag branch from 774faeb to e66773e Compare May 11, 2019 01:11
None
} else {
preCleanStats.recordCleanablePartitions(cleanableLogs.size)
val filthiest = cleanableLogs.max
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't notice earlier: should we actually prioritize a log that is past its max compaction delay over a log that is more dirty?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original idea is to sort the log based on the compaction delay that passed the the max delay. But a log with a very short compaction delay may always takes priority over a very dirty log (with high dirty ratio). I think it is better not to prioritize it since the the compaction finish time is not actually guaranteed since log cleaner thread can take a long time for compaction, and it can work on other log when a log go beyond the max compaction lag.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really depends on the interpretation of the config. For PII data for e.g., you should be able to provide some guarantee. Either way, there is the possibility of starvation. However, we do have sensors to indicate this situation, so I think we can leave it as is and revisit if people want harder guarantees.

@jjkoshy jjkoshy merged commit 1fdc853 into apache:trunk May 13, 2019
@jjkoshy
Copy link
Copy Markdown
Contributor

jjkoshy commented May 13, 2019

@xiowu0 can you also provide a follow-up patch to update the website documentation?

omkreddy added a commit to confluentinc/kafka that referenced this pull request May 13, 2019
…es-14-May

* AK_REPO/trunk: (24 commits)
  KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (apache#6009)
  KAFKA-8335; Clean empty batches when sequence numbers are reused (apache#6715)
  KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (apache#6645)
  KAFKA-6521: Use timestamped stores for KTables (apache#6667)
  [MINOR] Consolidate in-memory/rocksdb unit tests for window & session store (apache#6677)
  MINOR: Include StickyAssignor in system tests (apache#5223)
  KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (apache#5918)
  MINOR: Align KTableAgg and KTableReduce (apache#6712)
  MINOR: Fix code section formatting in TROGDOR.md (apache#6720)
  MINOR: Remove unnecessary OptionParser#accepts method call from PreferredReplicaLeaderElectionCommand (apache#6710)
  KAFKA-8352 : Fix Connect System test failure 404 Not Found (apache#6713)
  KAFKA-8348: Fix KafkaStreams JavaDocs (apache#6707)
  MINOR: Add missing option for running vagrant-up.sh with AWS to vagrant/README.md
  KAFKA-8344; Fix vagrant-up.sh to work with AWS properly
  MINOR: docs typo in '--zookeeper myhost:2181--execute'
  MINOR: Remove header and key/value converter config value logging (apache#6660)
  KAFKA-8231: Expansion of ConnectClusterState interface (apache#6584)
  KAFKA-8324: Add close() method to RocksDBConfigSetter (apache#6697)
  KAFKA-6789; Handle retriable group errors in AdminClient API (apache#5578)
  KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354)
Records become eligible for compaction after the specified time interval.

Author: Xiongqi Wu <xiowu@linkedin.com>
Reviewer: Joel Koshy <jjkoshy@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants