Skip to content

KAFKA-10173: Fix suppress changelog binary schema compatibility#8905

Merged
vvcephei merged 23 commits intoapache:trunkfrom
vvcephei:kafka-10173-improve-header-comparison
Jun 27, 2020
Merged

KAFKA-10173: Fix suppress changelog binary schema compatibility#8905
vvcephei merged 23 commits intoapache:trunkfrom
vvcephei:kafka-10173-improve-header-comparison

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei commented Jun 19, 2020

We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

  • Refactor the schema compatibility test to use serialized data from older versions
    as a more foolproof compatibility test.
  • Refactor the upgrade system test to use the smoke test application so that we
    actually exercise a significant portion of the Streams API during upgrade testing
  • Add more recent versions to the upgrade system test matrix
  • Fix the compatibility bug by bumping the schema version to 3

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment on lines +292 to +300
if (record.partition() != partition) {
throw new IllegalStateException(
String.format(
"record partition [%d] is being restored by the wrong suppress partition [%d]",
record.partition(),
partition
)
);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the side, I realized we can consolidate this check and perform it first, rather than after we're already written bad data into the buffer.

)
);
} else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v"))) {
} else if (Arrays.equals(record.headers().lastHeader("v").value(), V_1_CHANGELOG_HEADER_VALUE)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix (although it was probably fine before). The implementation of Header.equals is not specified by any contract, so it's safer to perform a direct comparison on the header values. Just as before, I'm comparing byte arrays to avoid deserializing the value.

private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my IDEA says this variable is never used.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it is used in line 342.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. The unused variable is V_1_CHANGELOG_HEADERS rather than V_1_CHANGELOG_HEADER_VALUE

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. My mistake. Thanks for pointing it out.

);
}
} else {
if (record.headers().lastHeader("v") == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
We seek the last header many times. Could we reuse the return value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR lgtm, please feel free to merge after addressed @chia7712 's comments.

private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
private static final RecordHeaders V_1_CHANGELOG_HEADERS =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw it is used in line 342.

@vvcephei vvcephei changed the title KAFKA-10173: Directly use Arrays.equals for version comparison KAFKA-10173: Fix suppress changelog binary schema compatibility Jun 25, 2020
@vvcephei
Copy link
Copy Markdown
Contributor Author

I'm still cleaning up this PR. I'll call for reviews when it's ready.

John Roesler added 6 commits June 25, 2020 21:00
I was getting this exception, and somehow, the parallel GC parameter was the culprit

    java.lang.OutOfMemoryError: Java heap space
        at org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61)
        at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369)
        at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x00000001002cb440.restoreBatch(Unknown Source)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742)
Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whew! Can you take another look @guozhangwang and @chia7712 ?

After finding the root cause, I was able to fixed several related problems. The diff size is unfortunate but it's almost all the result of copy/pasting the smoke test into the upgrade-system-tests modules.

Comment thread build.gradle Outdated

defaultMaxHeapSize = "2g"
defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
defaultJvmArgs = ["-Xss4m"]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma , you'll probably want to know about this.

I have no idea why, but one of the new tests in this PR was failing with:

    java.lang.OutOfMemoryError: Java heap space
        at org.apache.kafka.streams.kstream.internals.FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(FullChangeSerde.java:82)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:90)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(TimeOrderedKeyValueBufferChangelogDeserializationHelper.java:61)
        at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:369)
        at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer$$Lambda$284/0x00000001002cb440.restoreBatch(Unknown Source)
        at org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest.shouldRestoreV3FormatWithV2Header(TimeOrderedKeyValueBufferTest.java:742)

I captured a flight recording and a heap dump on exit, but everything looked fine, and the heap was only a few megs at the time of the crash. I noticed first that if I just overrode all the jvm args, the test would pass, and through trial and error, I identified this one as the "cause".

I get an OOMe every time with -XX:+UseParallelGC and I've never gotten it without the flag. WDYT about dropping it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha! I figured it out. There actually was a bug in the test. While duck-typing, the code was trying to allocate an array of 1.8GB. It's funny that disabling this flag made this test pass on java 11 and 14. Maybe the flag partitions the heap on those versions or something, so the test didn't actually have the full 2GB available. Anyway, I'm about to push a fix and put the flag back.

* We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here
* so that we can produce the legacy format to test that we can still deserialize it.
*/
public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used in the test now, so I moved it.

if (oldValue == null) {
buffer.putInt(NULL_VALUE_SENTINEL);
} else if (priorValue == oldValue) {
} else if (Arrays.equals(priorValue, oldValue)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was correct before, since we check equality and enforce identity in the constructor, but Arrays.equals is extremely cheap when the arrays are identical, so explicitly doing an identity check instead of equality was a micro-optimization.

Comment on lines +65 to +69
private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {(byte) 1};
private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {(byte) 2};
private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {(byte) 3};
static final RecordHeaders CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", V_3_CHANGELOG_HEADER_VALUE)});
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to store the whole RecordHeaders for the old versions, just the actual version flag.

// in this case, the changelog value is a serialized BufferValue
} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {

final DeserializationResult deserializationResult = duckTypeV2(record, key);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment on this method for why we need to duck-type version 2. I pulled these deserializations into a helper class because all the extra branches pushed our cyclomatic complexity over the limit.

But I kept the first two branches here because they aren't pure functions. They perform a lookup in the buffer itself as part of converting the old format.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify which comment are you referring to? I did not see any comments for the "restoreBatch" method..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the comments in duckTypeV2.

Basically, because we released three versions that would write data in the "v3" format, but with the "v2" flag, when we see the v2 flag, the data might be in v2 format or v3 format. The only way to tell is to just try to deserialize it in v2 format, and if we get an exception, then to try with v3 format.

Comment thread tests/kafkatest/services/streams.py Outdated
Comment on lines +330 to +335
"replication.factor": self.REPLICATION_FACTOR,
"num.standby.replicas": 2,
"buffered.records.per.partition": 100,
"commit.interval.ms": 1000,
"auto.offset.reset": "earliest",
"acks": "all"}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from the java code so that all the configs can be defined together.

# can be replaced with metadata_2_versions
backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)]
smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See KAFKA-10203 for why I couldn't go past 2.2


@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version):
@matrix(from_version=smoke_test_versions, to_version=dev_version)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were previously not testing 2.0+ at all. After rewriting this as a smoke test, it only applies to 2.2+. I also figured it makes more sense just to test upgrades to the current branch, rather than testing cross-upgrades between every pair of versions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think this is a great find.

self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of these changes are part of adapting the test to the smoke test app.

@@ -349,56 +370,42 @@ def get_version_string(self, version):
def start_all_nodes_with(self, version):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this method to start all the nodes concurrently, rather than one at a time. We still do a rolling upgrade, but there's no need to do a rolling startup.

// in this case, the changelog value is a serialized BufferValue
} else if (Arrays.equals(versionHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {

final DeserializationResult deserializationResult = duckTypeV2(record, key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify which comment are you referring to? I did not see any comments for the "restoreBatch" method..

import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;

public class StreamsSmokeTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming 22..25 client / drive code are all copy-pastes here so I skipped reviewing them. LMK if they aren't.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.


@matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version):
@matrix(from_version=smoke_test_versions, to_version=dev_version)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think this is a great find.

return deserializationResult;
}

private static DeserializationResult deserializeV2(final ConsumerRecord<byte[], byte[]> record,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some docs, either here or directly inside InMemoryTimeOrderedKeyValueBuffer.java explaining the format difference would help a lot. You can see some examples like object GroupMetadataManager

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing!

changelogTopic,
key,
null,
null,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just thinking, maybe we should encode headers to tombstones too in case in the future we changed the semantics of tombstones?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember considering this when I added the first version header. The reason I didn't is that, since the initial version didn't have any headers, even if we change the tombstone format in the future, we'll always have to interpret a "no header, null value" record as being a "legacy format" tombstone, just like we have to interpret a "no header, non-null value" as being a "legacy format" data record.

You can think of "no header" as indicating "version 0". Since we haven't changed the format of tombstones yet, there's no value in adding a "version 1" flag. We should just wait until we do need to make such a change (if ever).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang
Copy link
Copy Markdown
Contributor

test this

@vvcephei
Copy link
Copy Markdown
Contributor Author

Hmm. Still saw the OOME in https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3134/

@vvcephei
Copy link
Copy Markdown
Contributor Author

Retest this please

@vvcephei
Copy link
Copy Markdown
Contributor Author

Ah, that heap space thing was legit. Fix coming...

@vvcephei
Copy link
Copy Markdown
Contributor Author

Hey @guozhangwang , you might want to take a look at that last fix. The duck-typing code was producing an OOME some times, when it would just interpret a random integer out of the buffer as a "size" (integer) and blindly allocate an array of that size.

I added a Util (with tests) that has a guard to prevent this.

@vvcephei
Copy link
Copy Markdown
Contributor Author

I will follow up shortly to extract the system tests to a separate PR, since we're having trouble running the tests at all right now, and we wouldn't know if they are even more broken.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Ok, @guozhangwang , This is my "final" iteration. I pulled the system tests out, and I'll follow up with another PR later. This PR should be sufficient for the basic purpose, thanks to the new "binary" compatibility unit tests.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We can merge after green build.

Let's trigger system test builds on the follow-up PR

@vvcephei
Copy link
Copy Markdown
Contributor Author

Failures were unrelated:

kafka.api.PlaintextConsumerTest > testLowMaxFetchSizeForRequestAndPartition FAILED
    org.scalatest.exceptions.TestFailedException: Timed out before consuming expected 2700 records. The number consumed was 1077.
        at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
        at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
        at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
        at org.scalatest.Assertions.fail(Assertions.scala:1091)
        at org.scalatest.Assertions.fail$(Assertions.scala:1087)
        at org.scalatest.Assertions$.fail(Assertions.scala:1389)
        at kafka.api.AbstractConsumerTest.consumeRecords(AbstractConsumerTest.scala:158)
        at kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition(PlaintextConsumerTest.scala:804)
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED
    java.lang.RuntimeException: Could not find enough records. found 0, expected 100
        at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
        at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:217)
kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop

@vvcephei vvcephei merged commit 8319389 into apache:trunk Jun 27, 2020
@vvcephei vvcephei deleted the kafka-10173-improve-header-comparison branch June 27, 2020 02:41
vvcephei added a commit that referenced this pull request Jun 27, 2020
We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
vvcephei added a commit that referenced this pull request Jun 27, 2020
We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
vvcephei added a commit that referenced this pull request Jun 27, 2020
We inadvertently changed the binary schema of the suppress buffer changelog
in 2.4.0 without bumping the schema version number. As a result, it is impossible
to upgrade from 2.3.x to 2.4+ if you are using suppression.

* Refactor the schema compatibility test to use serialized data from older versions
as a more foolproof compatibility test.
* Refactor the upgrade system test to use the smoke test application so that we
actually exercise a significant portion of the Streams API during upgrade testing
* Add more recent versions to the upgrade system test matrix
* Fix the compatibility bug by bumping the schema version to 3

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
@vvcephei
Copy link
Copy Markdown
Contributor Author

backported to 2.6, 2.5, and 2.4. I ran the streams and client tests each time, as well as systemTestLibs.

Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Jun 27, 2020
* 'trunk' of github.com:apache/kafka:
  KAFKA-10180: Fix security_config caching in system tests (apache#8917)
  KAFKA-10173: Fix suppress changelog binary schema compatibility (apache#8905)
  KAFKA-10166: always write checkpoint before closing an (initialized) task (apache#8926)
  MINOR: Rename SslTransportLayer.State."NOT_INITALIZED" enum value to "NOT_INITIALIZED"
  MINOR: Update Scala to 2.13.3 (apache#8931)
  KAFKA-9076: support consumer sync across clusters in MM 2.0 (apache#7577)
  MINOR: Remove Diamond and code code Alignment (apache#8107)
  KAFKA-10198: guard against recycling dirty state (apache#8924)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants