Skip to content

KAFKA-13769: Explicitly route FK join results to correct partitions#11945

Merged
mjsax merged 12 commits intoapache:trunkfrom
Gerrrr:KAFKA-13769
Apr 15, 2022
Merged

KAFKA-13769: Explicitly route FK join results to correct partitions#11945
mjsax merged 12 commits intoapache:trunkfrom
Gerrrr:KAFKA-13769

Conversation

@Gerrrr
Copy link
Copy Markdown
Contributor

@Gerrrr Gerrrr commented Mar 24, 2022

Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.

There are cases, where this behaviour is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.

Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper and
SubscriptionResponseWrapper payloads. Default FK response-sink
partitioner extracts the correct partition from the value and
routes the message accordingly.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.

There are cases, where this behaviour is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.

Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper and
SubscriptionResponseWrapper payloads. Default FK response-sink
partitioner extracts the correct partition from the value and
routes the message accordingly.
);

final ByteBuffer buf;
int dataLength = 2 + primaryKeySerializedData.length;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unify the dataLength computation into a single place? It's a little bit scattered at hard to follow. Also with different versions, I like a more explicit pattern using switch:

switch (version) {
case 0: encodeV1(); break;
case 1: encodeV2(); break;
default: throw...
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in a59a4f6

@Override
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
//{1-bit-isHashNull}{7-bits-version}{4-bytes-primaryPartition}{Optional-16-byte-Hash}{n-bytes serialized data}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we cannot just write the new format blindly. The receiving task may be executed on a different instance that might not have been ungraded yet during a rolling upgrade. -- We can only start to use the new format after all instances are upgraded, otherwise older instances would crash (or the upgrade can not be done in rolling bounce fashion what seen not acceptable).

Not sure right now how we can handle this case though: The coarse grained solution would be to use a config and let users do two rolling bounces (requires a KIP). Or we try to piggy-back on the rebalance metadata version, but I am not sure how easy it would be to do, given that we setup the serdes at a totally different layer and get access to the rebalance metadata version only during runtime.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is indeed a problem. It's the same issue I documented here: https://issues.apache.org/jira/browse/KAFKA-10336

I was also thinking that the rebalance metadata version could give us a mechanism to coordinate this kind of thing across a rolling bounce. I documented the approach in that ticket, but never got a chance to work on it.

If we don't want to bite off a general solution to it, two rolling bounces might be the best compromise, as annoying as they are.

Alternatively, since we actually do check the version and throw an exception if we encounter it, it is "safe" (in that there's no data corruption) to just go ahead and let the exceptions happen during a rolling bounce. Not the nicest thing to do, though.

Copy link
Copy Markdown
Contributor Author

@Gerrrr Gerrrr Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen when an old processor receives a v1 record and throws an exception? Will it throw that record on the floor or just blocks processing until the upgrade?

As John suggested in KAFKA-10336, we can pass UPGRADE_FROM into the serde so that it produces v0 records if the option is set. I will also add 3.3 as a valid choice for that option. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen when an old processor receives a v1 record and throws an exception? Will it throw that record on the floor or just blocks processing until the upgrade?

The thread with crash, and we will eventually rebalance. So there is no data-loss, but for at-least-once duplicate processing. It's very bad user experience if they upgrade one instance and suddenly all other might crash...

Re-using UPGRAD_FROM should work. The config is already there, so maybe it's ok if we just add a never "accepted value" without doing a KIP? We need to cover this case in the upgrade docs though!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added UPGRADE_FROM in d9f6c52. There, I added all versions from 2.7 to 3.2 as UPGRADE_FROM options. If serde detects the upgrade, it uses v0 format to serialize the data. Deserializer does not need any changes as there is no difference between v0 and upgrade-from-v0 formats.

} else {
buf.put((byte) (data.getVersion() | (byte) 0x80));
}
buf.putInt(data.getPrimaryPartition());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to the end of the byte array and thus re-use serializeV0 to write the prefix?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is a good idea. As an advantage, we will remove duplicated parts in serialize. However, there are 2 downsides:

  • We use v0 path in 2 situations - when it is real v0 and when we upgrade from v0 (i.e. a data record can actually have a primary partition and v1 as part of its state). This means that we can't just put data.getVersion() into the buffer, but have to hardcode it to 0. Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome.
  • Moving a fixed-size field to the end of the byte array will complicate deserialization. Consider

final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
if (primaryKeySerializationPseudoTopic == null) {
primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
}
final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
primaryKeyRaw);

Right now, the logic for finding PK size is straightforward. It can be summarized as "take it from here and until the end of the buffer". If we move primary partition to the end, we'll have to remove another 4 bytes iff the version is greater than 0.

I'd say that code duplication is lesser evil in this case. WDYT? I am happy to move primary partition to the end if you don't agree.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we can't just put data.getVersion() into the buffer, but have to hardcode it to 0

Yes, but this seems ok? Ie, we would have a serializeV0Internal(final short version) that is called as (something like):

public byte[] serializeV0() {
  return serializeV0Internal(0).getBytes();
}

public byte[] serializeV1() {
  final ByteStream out = serializeV0Internal(0);
  
  // add v1 stuff to `out`
  
  return out.getBytes();
}

Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome.

Why?

I understand that putting variable length at the end could be simpler. I guess I am just used to how it's usually done if Kafka. And personally, if I read code it's easier to have a mental model of the different versions in the bytes, if older versions are always a prefix of newer versions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 13e772e.

Comment thread docs/streams/upgrade-guide.html Outdated
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 2.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "2.3"</code>) and during the second you remove it. This is required to safely upgrade to the new cooperative rebalancing protocol of the embedded consumer. Note that you will remain using the old eager
rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.3 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upgrading from 3.3

Should be 3.2 -- the fix should go into 3.3, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Fixed in cd8ba18.

"Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" +
UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" +
UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" +
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\" (for upgrading from the corresponding old version).";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When upgrading from 2.4 to a newer version it is not required to specify this config.

Seems this needs an update?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed in cd8ba18.


private static boolean upgradeFromV0(final Map<String, ?> configs) {
final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
if (!(upgradeFrom instanceof String)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this check. StreamsConfig ensures that it will be a String. Cf StreamConfig:

.define(UPGRADE_FROM_CONFIG,
        ConfigDef.Type.STRING,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with a not-null check in 8ebbaaf.


private static boolean upgradeFromV0(final Map<String, ?> configs) {
final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
if (!(upgradeFrom instanceof String)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with a not-null check in 8ebbaaf.

} else {
buf.put((byte) (data.getVersion() | (byte) 0x80));
}
buf.putInt(data.getPrimaryPartition());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we can't just put data.getVersion() into the buffer, but have to hardcode it to 0

Yes, but this seems ok? Ie, we would have a serializeV0Internal(final short version) that is called as (something like):

public byte[] serializeV0() {
  return serializeV0Internal(0).getBytes();
}

public byte[] serializeV1() {
  final ByteStream out = serializeV0Internal(0);
  
  // add v1 stuff to `out`
  
  return out.getBytes();
}

Making this logic generic enough to handle v0, upgrade-from, and v1 will be cumbersome.

Why?

I understand that putting variable length at the end could be simpler. I guess I am just used to how it's usually done if Kafka. And personally, if I read code it's easier to have a mental model of the different versions in the bytes, if older versions are always a prefix of newer versions?

@Gerrrr
Copy link
Copy Markdown
Contributor Author

Gerrrr commented Apr 11, 2022

@mjsax Can you please take another look?

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM -- just wondering if we need to change the subscription-response wire format?

Also (can also be a follow up PR): we should add a system test for the upgrade to make sure it really works) -- we have already upgrade tests, so maybe it's sufficient to add a FK-join to this test topology to cover this case?

public class SubscriptionResponseWrapper<FV> {
final static byte CURRENT_VERSION = 0x00;
final static byte CURRENT_VERSION = 0x01;
// 0x00
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// version 0x00 fields:

Same below:

// version 0x01 fields:

Or just change to 1 instead of 0x01 (compare Subscription.java -- might be nice to align both).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final FV foreignValue;
private final byte version;
// 0x01
private final Integer primaryPartition;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we need this field in this class, so we actually need to serialize it? It seems it must not go over the wire, does it?

Thus, for the subscription response, we don't need the dump the version? -- We only need it on the right hand side to write the response into the topic, but we can do this in-memory only?

If this is correct, we also don't need to update the corresponding Serde.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Done in f7f9181.

static final byte CURRENT_VERSION = 0;
static final byte CURRENT_VERSION = 1;

// v0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// v0 fields:

Same for v1 below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
primaryKeyRaw);
primaryKeyRaw);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indention

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 1361497.

final SubscriptionResponseWrapper<V> data,
final byte version,
final int extraLength) {
final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (formatting -- hard to see where method signature end and code starts):

    final int extraLength
) {
    final byte[] serializedData = data.getForeignValue() == null ? null : serializer.serialize(topic, data.getForeignValue());

final int valueLength;
if (version > 0) {
valueLength = data.length - lengthSum - Integer.BYTES;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

@mjsax mjsax merged commit adf5cc5 into apache:trunk Apr 15, 2022
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 15, 2022

Merged to trunk. Thanks for the fix!

philipnee added a commit to philipnee/kafka that referenced this pull request Apr 22, 2022
revert test commit

test commit

revert test commit

MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (apache#11985)

Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

MINOR: Correct Connect docs on connector/task states (apache#11914)

The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state.

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13828; Ensure reasons sent by the consumer are small (apache#12043)

This PR reworks the reasons used in the ConsumerCoordinator to ensure that they remain reasonably short.

Reviewers: Bruno Cadonna <bruno@confluent.io>

KAFKA-13542: Add rebalance reason in Kafka Streams (apache#12018)

Reviewers: Bruno Cadonna <bruno@confluent.io>, David Jacot <djacot@confluent.io>

MINOR: Verify stopReplica if broker epoch not stale (apache#12040)

Verify that ReplicaManager.stopReplica is called if the stop replica
request doesn't result in a stale broker epoch error.

Reviewers: Mickael Maison <mimaison@users.noreply.github.com>

KAFKA-13651; Add audit logging to `StandardAuthorizer` (apache#12031)

This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode apache#11910

In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode.  This PR also changes TopicCommandIntegrationTest to
support KRaft mode.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-12613: Fix inconsistent validation logic between KafkaConfig and LogConfig (apache#10472)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13823 Feature flag changes from KIP-778 (apache#12036)

This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.

The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands.  Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>

MINOR: Update LICENSE-binary (apache#12051)

Updates the license file.

Reviewer: Bill Bejeck <bbejeck@apache.org>

MINOR: Remove redundant conditional judgments in Selector.clear() (apache#12048)

Condition 'sendFailed' is always 'false' when reached.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (apache#12041)

Following PR apache#11940, ignore unused config when ConsumerCoordinator is not constructed.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

[MINOR] Update upgrade documentation for 3.2 (apache#12055)

Reviewer: Bruno Cadonna <cadonna@apache.org>

MINOR: Move some TopicCommand and ConfigCommand integration tests to unit tests (apache#12024)

Move some TopicCommand and ConfigCommand integration tests to unit tests to speed up the tests

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Improve the description of principal under different mechanisms of sasl (apache#11947)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: update comment in LocalLog.replaceSegments() (apache#12054)

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Make link in quickstart dynamic (apache#12057)

Reviewer: Matthias J. Sax <mjsax@apache.org>

KAFKA-13769: Explicitly route FK join results to correct partitions (apache#11945)

Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.

There are cases, where this behavior is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.

Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper payloads.
Default FK response-sink partitioner extracts the correct partition
from the value and routes the message accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>

KAFKA-13807: Fix incrementalAlterConfig and refactor some things (apache#12033)

Ensure that we can set log.flush.interval.ms at the broker or cluster level via
IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.

Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
pass through information like the deadline (if there is one) and the Kafka principal which is
making the request (in the future we will want to log this information).

In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
request interval, to avoid heartbeats piling up on the controller queue. This should have been done
previously, but we overlooked it.

Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
one of these.

In ControllerConfigurationValidator, create a separate function for when we just want to validate
that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
validation code for IncrementalAlterConfigs, but this was messy.

Split out the replica placement code into a separate package and reorganize it a bit.

Reviewers: David Arthur <mumrah@gmail.com

MINOR: Correct spelling errors in KafkaRaftClient (apache#12061)

Correct spelling errors in KafkaRaftClient

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>

MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (apache#12027)

The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end.

Reviewers: Jason Gustafson <jason@confluent.io>

KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date (apache#11681)

The reason why this test is flaky is because we have race condition at the beginning of the test, when brokers are staring up, and the adminClient is requesting for brokers metadata. Once the adminClient only got partial metadata, the test will fail, because in these tests, brokers will be shutdown to test leader election.

Fix this issue by explicitly waiting for metadata cache up-to-date in waitForReadyBrokers, and let admin client get created after waitForReadyBrokers.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>

KAFKA-13832: Fix flaky testAlterAssignment (apache#12060)

In KRaft mode the metadata is not propagate in time, so we should should wait for it before make assertions.

Reviewers:  Luke Chen <showuon@gmail.com>

KAFKA-13654: Extend KStream process with new Processor API (apache#11993)

Updates the KStream process API to cover the use cases
of both process and transform, and deprecate the KStream transform API.

Implements KIP-820

Reviewer: John Roesler <vvcephei@apache.org>

KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (apache#12063)

Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker
configs, and never for topic configs or cluster configs.

The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.

Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>

KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (apache#12004)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: Scala cleanups in core (apache#12058)

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>

KAFKA-13799: Improve documentation for Kafka zero-copy (apache#12052)

Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13785: [5/N][emit final] cache for time ordered window store (apache#12030)

A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names (apache#11703)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

HOTFIX: fix broken trunk due to conflicting and overlapping commits (apache#12074)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, David Arthur <mumrah@gmail.com>

MINOR: cleanup for postProcessAndValidateIdempotenceConfigs method (apache#12069)

Reviewers: Luke Chen <showuon@gmail.com>

KAFKA-13834: fix drain batch starving issue (apache#12066)

In drainBatchesForOneNode method, there's possibility causing some partitions in a node will never get picked. Fix this issue by maintaining a drainIndex for each node.

Reviewers: Luke Chen <showuon@gmail.com>, RivenSun <91005273+RivenSun2@users.noreply.github.com>

MINOR: Improve Gradle Caching and Fix Deprecations (apache#12003)

* Fix UP-TO-DATE check in `create*VersionFile` tasks

`create*VersionFile` tasks explicitly declared output UP-TO-DATE status
 as being false. This change properly sets the inputs to
`create*VersionFile` tasks to the `commitId` and `version` values and
sets `receiptFile` locally rather than in an extra property.

* Enable output caching for `process*Messages` tasks

`process*Messages` tasks did not have output caching enabled. This
change enables that caching, as well as setting a property name and
RELATIVE path sensitivity.

* Fix existing Gradle deprecations

Replaces `JavaExec#main` with `JavaExec#mainClass`

Replaces `Report#destination` with `Report#outputLocation`

Adds a `generator` configuration to projects that need to resolve
the `generator` project (rather than referencing the runtimeClasspath
of the `generator` project from other project contexts.

Reviewers: Mickael Maison <mickael.maison@gmail.com>


MINOR; Fix partition change record noop check (apache#12073)

When LeaderRecoveryState was added to the PartitionChangeRecord, the
check for being a noop was not updated. This commit fixes that and
improves the associated test to avoid this oversight in the future.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>

MINOR: revert back to 60s session timeout for static membership test (apache#11881)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (apache#12075)

This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants