From 12724dc650bd53fea3f985fc819cf93a4012ffcc Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 7 Feb 2020 13:10:26 -0800 Subject: [PATCH 1/2] fix two failures in JDK11 --- .../internals/StoreChangelogReader.java | 2 +- .../kstream/internals/KStreamImplTest.java | 35 ++++++++++--------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 8b73388b35579..143ae68a31619 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -629,7 +629,7 @@ private void initializeChangelogs(final Set newPartitionsToRe final ChangelogMetadata changelogMetadata = changelogs.get(partition); final Long endOffset = endOffsets.get(partition); final Long committedOffset = newPartitionsToFindCommittedOffset.contains(partition) ? - committedOffsets.get(partition) : Long.MAX_VALUE; + committedOffsets.get(partition) : Long.valueOf(Long.MAX_VALUE); if (endOffset != null && committedOffset != null) { if (changelogMetadata.restoreEndOffset != null) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0974570641af9..ee8a158e0738e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -2886,10 +2886,10 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { builder.stream(input, consumed) .toTable() .mapValues( - value -> value.charAt(0) - (int) 'a', - Materialized.>as(storeName) + value -> String.valueOf(value.charAt(0) - (int) 'a'), + Materialized.>as(storeName) .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Integer())) + .withValueSerde(Serdes.String())) .toStream() .to(output); @@ -2925,8 +2925,8 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { Instant.ofEpochMilli(0L))) { final TestInputTopic inputTopic = driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); - final TestOutputTopic outputTopic = - driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Integer().deserializer()); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.String().deserializer()); final KeyValueStore store = driver.getKeyValueStore(storeName); inputTopic.pipeInput("A", "green", 10L); @@ -2935,21 +2935,21 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { inputTopic.pipeInput("C", "yellow", 15L); inputTopic.pipeInput("D", "green", 11L); - final Map expectedStore = new HashMap<>(); - expectedStore.putIfAbsent("A", 1); - expectedStore.putIfAbsent("B", 6); - expectedStore.putIfAbsent("C", 24); - expectedStore.putIfAbsent("D", 6); + final Map expectedStore = new HashMap<>(); + expectedStore.putIfAbsent("A", "1"); + expectedStore.putIfAbsent("B", "6"); + expectedStore.putIfAbsent("C", "24"); + expectedStore.putIfAbsent("D", "6"); assertEquals(expectedStore, asMap(store)); assertEquals( asList( - new TestRecord<>("A", 6, Instant.ofEpochMilli(10)), - new TestRecord<>("B", 6, Instant.ofEpochMilli(9)), - new TestRecord<>("A", 1, Instant.ofEpochMilli(12)), - new TestRecord<>("C", 24, Instant.ofEpochMilli(15)), - new TestRecord<>("D", 6, Instant.ofEpochMilli(11))), + new TestRecord<>("A", "6", Instant.ofEpochMilli(10)), + new TestRecord<>("B", "6", Instant.ofEpochMilli(9)), + new TestRecord<>("A", "1", Instant.ofEpochMilli(12)), + new TestRecord<>("C", "24", Instant.ofEpochMilli(15)), + new TestRecord<>("D", "6", Instant.ofEpochMilli(11))), outputTopic.readRecordsToList()); } @@ -2957,7 +2957,10 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { private static Map asMap(final KeyValueStore store) { final HashMap result = new HashMap<>(); - store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); + store.all().forEachRemaining(kv -> { + System.out.println(kv); + result.put(kv.key, kv.value); + }); return result; } } From d1d11d5be1b71cdea3a64702c5749c1bc5a879eb Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 7 Feb 2020 13:49:52 -0800 Subject: [PATCH 2/2] github comments --- .../kafka/streams/kstream/internals/KStreamImplTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index ee8a158e0738e..7cfe510814124 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -2957,10 +2957,7 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { private static Map asMap(final KeyValueStore store) { final HashMap result = new HashMap<>(); - store.all().forEachRemaining(kv -> { - System.out.println(kv); - result.put(kv.key, kv.value); - }); + store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); return result; } }