diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 8b73388b35579..143ae68a31619 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -629,7 +629,7 @@ private void initializeChangelogs(final Set newPartitionsToRe final ChangelogMetadata changelogMetadata = changelogs.get(partition); final Long endOffset = endOffsets.get(partition); final Long committedOffset = newPartitionsToFindCommittedOffset.contains(partition) ? - committedOffsets.get(partition) : Long.MAX_VALUE; + committedOffsets.get(partition) : Long.valueOf(Long.MAX_VALUE); if (endOffset != null && committedOffset != null) { if (changelogMetadata.restoreEndOffset != null) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0974570641af9..7cfe510814124 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -2886,10 +2886,10 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { builder.stream(input, consumed) .toTable() .mapValues( - value -> value.charAt(0) - (int) 'a', - Materialized.>as(storeName) + value -> String.valueOf(value.charAt(0) - (int) 'a'), + Materialized.>as(storeName) .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.Integer())) + .withValueSerde(Serdes.String())) .toStream() .to(output); @@ -2925,8 +2925,8 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { Instant.ofEpochMilli(0L))) { final TestInputTopic inputTopic = driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); - final TestOutputTopic outputTopic = - driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Integer().deserializer()); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.String().deserializer()); final KeyValueStore store = driver.getKeyValueStore(storeName); inputTopic.pipeInput("A", "green", 10L); @@ -2935,21 +2935,21 @@ public void shouldSupportTriggerMaterializedWithKTableFromKStream() { inputTopic.pipeInput("C", "yellow", 15L); inputTopic.pipeInput("D", "green", 11L); - final Map expectedStore = new HashMap<>(); - expectedStore.putIfAbsent("A", 1); - expectedStore.putIfAbsent("B", 6); - expectedStore.putIfAbsent("C", 24); - expectedStore.putIfAbsent("D", 6); + final Map expectedStore = new HashMap<>(); + expectedStore.putIfAbsent("A", "1"); + expectedStore.putIfAbsent("B", "6"); + expectedStore.putIfAbsent("C", "24"); + expectedStore.putIfAbsent("D", "6"); assertEquals(expectedStore, asMap(store)); assertEquals( asList( - new TestRecord<>("A", 6, Instant.ofEpochMilli(10)), - new TestRecord<>("B", 6, Instant.ofEpochMilli(9)), - new TestRecord<>("A", 1, Instant.ofEpochMilli(12)), - new TestRecord<>("C", 24, Instant.ofEpochMilli(15)), - new TestRecord<>("D", 6, Instant.ofEpochMilli(11))), + new TestRecord<>("A", "6", Instant.ofEpochMilli(10)), + new TestRecord<>("B", "6", Instant.ofEpochMilli(9)), + new TestRecord<>("A", "1", Instant.ofEpochMilli(12)), + new TestRecord<>("C", "24", Instant.ofEpochMilli(15)), + new TestRecord<>("D", "6", Instant.ofEpochMilli(11))), outputTopic.readRecordsToList()); }