Skip to content

KAFKA-13883: Implement NoOpRecord and metadata metrics#12183

Merged
cmccabe merged 11 commits intoapache:trunkfrom
jsancio:kafka-13883-monitor-quorum
Jun 1, 2022
Merged

KAFKA-13883: Implement NoOpRecord and metadata metrics#12183
cmccabe merged 11 commits intoapache:trunkfrom
jsancio:kafka-13883-monitor-quorum

Conversation

@jsancio
Copy link
Copy Markdown
Member

@jsancio jsancio commented May 19, 2022

Public Changes

  1. A new configuration flag metadata.max.idle.interval.ms
  2. A new metrics group "broker-metadata-metrics" to the brokers
  3. Four new metrics to the KafkaController metric group
  4. Increase the IBP and metadata.version
  5. A new metadata record called NoOpRecord which is only written if we have a matching metadata version

Implementation Changes

The KRaft controller schedules an event to write NoOpRecord to the metadata log if the metadata version supports this feature. This event is scheduled at the interval defined in metadata.max.idle.interval.ms. Brokers and controllers were improved to ignore the NoOpRecord when replaying the metadata log.

The replica control manager was also changed to accept a metadata version supplier to determine if leader recovery state is supported.

Other Changes

Fixed a bug where metadata version 3.3-IV1 was not marked as changing the metadata.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@jsancio jsancio force-pushed the kafka-13883-monitor-quorum branch from bbad90f to d94afda Compare May 19, 2022 18:19
OptionalLong.empty()
}

val maxIdleIntervalNs = config.metadataMaxIdleIntervalMs match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more traditional to have conversion stuff like this in KafkaConfig. If you had a function like maxIdleIntervalNs there, it could return Option[Long] and we could convert that into OptionalLong as needed (I think a simple asScala can do it if you have the optional converter imported...)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I fixed this. asJava converts it to a Optional[Long] so I did it manually.

val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters in KRaft mode. " +
"If it is not set, the metadata log is placed in the first log directory from log.dirs."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot."
val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we disallow negative values for this? 0 for disabled seems clear enough.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes = getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
val metadataMaxIdleIntervalMs: Option[Int] = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I wrote above, seems like the accessor we want should give us nanoseconds not milliseconds. I suppose the value in nanoseconds should be a long too...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
.define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, null, HIGH, MetadataMaxRetentionBytesDoc)
.define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours * 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
.define(MetadataMaxIdleIntervalMsProp, INT, Defaults.MetadataMaxIdleIntervalMs, null, LOW, MetadataMaxIdleIntervalMsDoc)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we valiate that this is not negative? like I wrote above, there seems to be no reason to allow negative values

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.MetricConfig

final class BrokerMetrics private (metrics: Metrics) extends AutoCloseable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about BrokerServerMetrics as a name for this? That might make it more clear that this doesn't apply to KafkaServer...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.


// Support for leader recovery for unclean leader election (KIP-704)
IBP_3_2_IV0(4, "3.2", "IV0", false),
IBP_3_2_IV0(4, "3.2", "IV0", true),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. We'll have to make a note in the PR description as well


import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.ArrayList;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of prefer the style where all the java.util imports are in a single block, can we do that here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (controllerMetrics == null) {
throw new RuntimeException("You must specify controllerMetrics.");
if (metadataVersion == null) {
throw new IllegalStateException("Metadata version must be set before building");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just default to the latest version. It's very tedious to have to set this in every single unit test (most of which don't care)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting choice to have this off by default. I guess it probably would complicate a bunch of junit tests since it would create records we're not expecting. So maybe this is the correct way to go for now.

leaderImbalanceCheckIntervalNs, sessionTimeoutNs, controllerMetrics,
createTopicPolicy, alterConfigPolicy, configurationValidator, authorizer,
staticConfig, bootstrapMetadata);
return new QuorumController(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh. all right, fine. I guess we might as well do one line per parameter.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe. I thought it was improvement. I always find the previous style harder to read and update.

// the default append linger for KRaft is 25ms.
controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds());
} else {
// This is called with an offset of -1 when the active controller renounced leadership
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the comment should just mention that lastAppliedRecordOffset tracks last committed offset for standbys.

The bit about writeOffset being -1 is a bit low-level (it is true but that feels more like an implementation detail here...)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Copy Markdown
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cmccabe cmccabe merged commit 7d1b092 into apache:trunk Jun 1, 2022
@jsancio
Copy link
Copy Markdown
Member Author

jsancio commented Jun 1, 2022

Merging since the failure is unrelated. The failure at in the ZK tests and this is a KRaft only change:

testTopicIdUpgradeAfterReassigningPartitions() – kafka.controller.ControllerIntegrationTest

@jsancio jsancio deleted the kafka-13883-monitor-quorum branch June 1, 2022 17:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants