Skip to content

KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically#12296

Merged
showuon merged 10 commits intoapache:trunkfrom
tyamashi-oss:KAFKA-13996
Jul 8, 2022
Merged

KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically#12296
showuon merged 10 commits intoapache:trunkfrom
tyamashi-oss:KAFKA-13996

Conversation

@tyamashi-oss
Copy link
Copy Markdown
Contributor

@tyamashi-oss tyamashi-oss commented Jun 15, 2022

  • Problem to be solved

  • Implementation:

    • Add updateDesiredRatePerSec() on Throttler
    • Call updateDesiredRatePerSec() of Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner
    • I implemented the feature to be similar to reconfigure() of SocketServer
  • Alternative implementation considered (not adopted):

    • re-instantiate Throttler with new log.cleaner.io.max.bytes.per.second value in reconfigure() of Log Cleaner
      • However, since many parameter specifications are required to instantiate Throttler, I chose to be similar to SocketServer and update only log.cleaner.io.max.bytes.per.second
  • Test:

    • Added unit test in case of updating DesiredRatePerSec of Throttler
    • I confirmed by manual testing that log.cleaner.io.max.bytes.per.second can be changed using bin/kafka-configs.sh:
      • With this implementation, log.cleaner.io.max.bytes.per.second for Log Cleaner works as expected.

      [2022-06-15 22:44:03,089] INFO [kafka-log-cleaner-thread-0]:
      Log cleaner thread 0 cleaned log my-topic-0 (dirty section = [57585, 86901])
      2,799.3 MB of log processed in 596.0 seconds (4.7 MB/sec).
      Indexed 2,799.2 MB in 298.1 seconds (9.4 Mb/sec, 50.0% of total time)
      Buffer utilization: 0.0%
      Cleaned 2,799.3 MB in 298.0 seconds (9.4 Mb/sec, 50.0% of total time)
      Start size: 2,799.3 MB (29,317 messages)
      End size: 0.1 MB (1 messages)
      100.0% size reduction (100.0% fewer messages)
      (kafka.log.LogCleaner)`

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tyamashi-oss , thanks for the PR. Nice catch! Left some comments.

Comment thread core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
Comment thread core/src/main/scala/kafka/log/LogCleaner.scala
*/
@threadsafe
class Throttler(desiredRatePerSec: Double,
class Throttler(var desiredRatePerSec: Double,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the desiredRatePerSec can be changed by other threads, should we volatile it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon given that we're synchronizing on lock anyway at the point of use, maybe synchronizing in updateDesiredRatePerSec() would actually be better?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Good point! +1. Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out, @showuon and @tombentley.
Indeed, it needs volatile or synchronized.

I would like your opinion. Should it use volatile? or synchronized?
I have changed the implementation once with volatile, so if you prefer synchronized, please let me know. 9b2477d

If use synchronized, do we apply a synchronized block to this object as following code?
I think lock object cannot be used for synchronized in updateDesiredRatePerSec() like lock synchronized {…} block in maybeThrottle(), it may take more than a few minute due to frequent sleeps depending on Throttler.desiredRatePerSec, and may block reconfigure() thread at updateDesiredRatePerSec() as well.

  def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
    this synchronized {
      desiredRatePerSec = updatedDesiredRatePerSec;
    }
  }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. Yes, it sleeps inside the lock. At first, I thought it is a but, but after further thought, I think it is intended because there might be multiple threads try to call maybeThrottle method. We'd like to calculate the throttle time sequentially (I guess). This code is written since 2011, so I think we just keep it as is.
In that case, I agree volatile is a better solution. Otherwise, the reconfigure change might not be able to take effect immediately.

@tombentley , WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable.

Comment thread core/src/main/scala/kafka/log/LogCleaner.scala
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)

assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
Copy link
Copy Markdown
Member

@showuon showuon Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the 1st parameter of assertEquals should be expected value, and 2nd one is actual value. Same comment applied to below assertion. That is:
assertEquals(1000000, logCleaner.throttler.desiredRatePerSec)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the error message might be able to update to:

Throttler.desiredRatePerSec should be initialized from initial `cleaner.io.max.bytes.per.second` config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I’ve updated the assert method parameters and the error message.
e05707d


logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))

assertEquals(logCleaner.throttler.desiredRatePerSec, 20000000, "Throttler.desiredRatePerSec should be updated with new KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message might be able to update to:

Throttler.desiredRatePerSec should be updated with new `cleaner.io.max.bytes.per.second` config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I’ve update the error message.
e05707d

Comment thread core/src/main/scala/kafka/log/LogCleaner.scala

/**
* Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
* Reconfigure log clean config. This updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and creates new ones.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 and in the sentence now. Maybe we can put it like this to make it clear:

It will (1) updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stops current log cleaners and creates new ones.

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I’ve update the comment.
f9d27f6

*/
@threadsafe
class Throttler(desiredRatePerSec: Double,
class Throttler(var desiredRatePerSec: Double,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. Yes, it sleeps inside the lock. At first, I thought it is a but, but after further thought, I think it is intended because there might be multiple threads try to call maybeThrottle method. We'd like to calculate the throttle time sequentially (I guess). This code is written since 2011, so I think we just keep it as is.
In that case, I agree volatile is a better solution. Otherwise, the reconfigure change might not be able to take effect immediately.

@tombentley , WDYT?

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tyamashi-oss , thanks for the update. Left some comments.

val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)

logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logCleaner should not call shutdown in the end since we never startup it, am I correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your concern is correct. Thank you.
LogCleaner.shutdown() should be called at the end of the test because kafka-log-cleaner-thread-x threads are created in LogCleaner.startup() at the end of LogCleaner.reconfigure(), and the threads continue to remain.
I appended LogCleaner.shutdown() to the end of the test and also used LogCleaner with empty startup() and shutdown() implementations.
The test is somewhat more white-box like according to the LogCleaner.reconfigure() implementation, but I couldn't think of any other way. Please let me know if you have any.
e05707d

Tomonari Yamashita added 2 commits July 6, 2022 20:04
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tyamashi-oss , thanks for the update. LGTM! Left a minor comment. And also, please fix the compilation error (only failed in java8/scala2.12 job):

[2022-07-06T11:15:21.162Z] > Task :core:compileTestScala FAILED

[2022-07-06T11:15:21.162Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-12296/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:1859:70: the result type of an implicit conversion must be more specific than Object

[2022-07-06T11:15:21.162Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-12296/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:1876:72: the result type of an implicit conversion must be more specific than Object

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12296/6/pipeline

logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps))

assertEquals(20000000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
} finally logCleaner.shutdown();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (1) no semicolon is needed (2) the format in Kafka is usually like this:

finally {
  logCleaner.shutdown()
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed it. Thank you for pointing out. 5c72740
With this fix, the java8/scala2.12 build now succeeds.

However, the JDK 11 / Scala 2.13 build, which used to succeed, now failed.
I believe this new build failure is unrelated to my changes. I tried running the same command in my local environment once and this build was successful.

[2022-07-07T06:22:53.589Z] > Task :streams:integrationTest FAILED

[2022-07-07T06:22:53.589Z] 

[2022-07-07T06:22:53.589Z] FAILURE: Build failed with an exception.

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12296/7/pipeline/

def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000: java.lang.Double)
Copy link
Copy Markdown
Member

@showuon showuon Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can just put 10000000L here for long value.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for noticing.
After checking other tests, I replaced Properties.put(Object, Object) to Properteis.setProperty(String, String), and used string, not number. Because it is appropriate to use string as key and value for Properties, and string values are also used when actually creating KafkaConfig. And I also confirmed that the previous compilation error reappeared when using 10000000L: "the result type of an implicit conversion must be more specific than Object"
ba640f5

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!

Copy link
Copy Markdown
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit on the formatting of the doc comment, otherwise LGTM.

Comment thread core/src/main/scala/kafka/log/LogCleaner.scala Outdated
*/
@threadsafe
class Throttler(desiredRatePerSec: Double,
class Throttler(var desiredRatePerSec: Double,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable.

Tomonari Yamashita and others added 2 commits July 8, 2022 15:58
fix java doc comment format

Co-authored-by: Tom Bentley <tombentley@users.noreply.github.com>
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Let's wait for the jenkins build results.

@showuon
Copy link
Copy Markdown
Member

showuon commented Jul 8, 2022

Failed tests are unrelated:

    Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
    Build / JDK 8 and Scala 2.12 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()

@showuon showuon merged commit e85500b into apache:trunk Jul 8, 2022
@showuon
Copy link
Copy Markdown
Member

showuon commented Jul 8, 2022

@tyamashi-oss , thanks for catching the issue and the fix!

@tyamashi-oss
Copy link
Copy Markdown
Contributor Author

@showuon @tombentley @Kvicii , thank you for your guidance. It was very helpful.

ijuma added a commit to confluentinc/kafka that referenced this pull request Aug 3, 2022
> $ git merge-base apache-github/3.3 apache-github/trunk
> 23c92ce

> $ git show 23c92ce
> commit 23c92ce
> Author: SC <pch838811@gmail.com>
> Date:   Mon Jul 11 11:36:56 2022 +0900
>
>    MINOR: Use String#format for niceMemoryUnits result (apache#12389)
>
>    Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>

* commit '23c92ce79366e86ca719e5e51c550c27324acd83':
  MINOR: Use String#format for niceMemoryUnits result (apache#12389)
  KAFKA-14055; Txn markers should not be removed by matching records in the offset map (apache#12390)
  KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection (apache#12381)
  KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically (apache#12296)
  KAFKA-13983: Fail the creation with "/" in resource name in zk ACL (apache#12359)
  KAFKA-12943: update aggregating documentation (apache#12091)
  KAFKA-13846: Follow up PR to address review comments (apache#12297)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants