Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,17 @@ private static class ValueList {
values[i] = min + i;
}
// We want to randomize the order of data to test not completely predictable processing order
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
shuffle(values, 10);

index = 0;
}

int next() {
return (index < values.length) ? values[index++] : -1;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test produces too much data, and thus ended up sending -1 for a lot of records at the end.

final int v = values[index++];
if (index >= values.length) {
index = 0;
}
Comment on lines +108 to +110
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this risk to bring a lot of disorder into the timestamps? I am referring to the comment on line 100. What are the consequences of such a disorder?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the comment is outdated. The custom TS-extractor was removed years ago: 52e3979

Let me delete the comment.

return v;
}
}

Expand All @@ -126,7 +128,7 @@ static void generatePerpetually(final String kafka,
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
}

final Random rand = new Random();
final Random rand = new Random(System.currentTimeMillis());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor side improvement


try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (true) {
Expand Down Expand Up @@ -313,7 +315,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio
}

private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
final Random rand = new Random();
final Random rand = new Random(System.currentTimeMillis());
for (int i = 0; i < data.length; i++) {
// we shuffle data within windowSize
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ public class SmokeTestUtil {

final static int END = Integer.MAX_VALUE;

static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) {
return () -> new SmokeTestProcessor(topic);
}

static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, "");
}

static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
return () -> new ContextualProcessor<Object, Object, Void, Void>() {
return () -> new SmokeTestProcessor(topic) {
private int numRecordsProcessed = 0;
private long smallestOffset = Long.MAX_VALUE;
private long largestOffset = Long.MIN_VALUE;

@Override
public void init(final ProcessorContext<Void, Void> context) {
super.init(context);
System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
numRecordsProcessed = 0;
smallestOffset = Long.MAX_VALUE;
largestOffset = Long.MIN_VALUE;
Expand All @@ -74,7 +76,7 @@ public void process(final Record<Object, Object> record) {

@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
super.close();
System.out.println("processed " + numRecordsProcessed + " records");
final long processed;
if (largestOffset >= smallestOffset) {
Expand Down Expand Up @@ -128,4 +130,27 @@ public static void sleep(final long duration) {
} catch (final Exception ignore) { }
}

private static class SmokeTestProcessor extends ContextualProcessor<Object, Object, Void, Void> {
private final String topic;

public SmokeTestProcessor(String topic) {
this.topic = topic;
}

@Override
public void init(final ProcessorContext<Void, Void> context) {
super.init(context);
System.out.println("[DEV] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
}

@Override
public void process(final Record<Object, Object> record) { }

@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
System.out.flush();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.tests;

import java.util.Arrays;
import java.util.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand All @@ -24,18 +26,24 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
Expand Down Expand Up @@ -69,6 +77,7 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.longSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;

public class StreamsUpgradeTest {
Expand All @@ -84,6 +93,7 @@ public static void main(final String[] args) throws Exception {
System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
System.out.println("props=" + streamsProperties);

// Do not use try-with-resources here; otherwise KafkaStreams will be closed when `main()` exits
final KafkaStreams streams = buildStreams(streamsProperties);
streams.start();

Expand All @@ -107,6 +117,9 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) {
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_table_agg",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
Expand All @@ -116,6 +129,23 @@ public static KafkaStreams buildStreams(final Properties streamsProperties) {
System.err.println("Caught " + e.getMessage());
}
}
if (runTableAgg) {
final String aggProduceValue = streamsProperties.getProperty("test.agg_produce_value", "");
if (aggProduceValue.isEmpty()) {
System.err.printf("'%s' must be specified when '%s' is true.", "test.agg_produce_value", "test.run_table_agg");
}
final String expectedAggValuesStr = streamsProperties.getProperty("test.expected_agg_values", "");
if (expectedAggValuesStr.isEmpty()) {
System.err.printf("'%s' must be specified when '%s' is true.", "test.expected_agg_values", "test.run_table_agg");
}
final List<String> expectedAggValue = Arrays.asList(expectedAggValuesStr.split(","));

try {
buildTableAgg(dataTable, aggProduceValue, expectedAggValue);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}

final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
Expand Down Expand Up @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable,
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}

private static void buildTableAgg(final KTable<String, Integer> sourceTable,
final String aggProduceValue,
final List<String> expectedAggValues) {
final KStream<Integer, String> result = sourceTable
.groupBy(
(k, v) -> new KeyValue<>(v, aggProduceValue),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this to use v as key -- works just fine

Grouped.with(intSerde, stringSerde))
.aggregate(
() -> new Agg(Collections.emptyList(), 0),
(k, v, agg) -> {
final List<String> seenValues;
final boolean updated;
if (!agg.seenValues.contains(v)) {
seenValues = new ArrayList<>(agg.seenValues);
seenValues.add(v);
Collections.sort(seenValues);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need to sort?

updated = true;
} else {
seenValues = agg.seenValues;
updated = false;
}

final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing.
if (shouldLog) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this slightly to avoid spamming the output

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing"

Copy link
Copy Markdown
Member Author

@mjsax mjsax May 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally it was

if (shouldLog && seenValues.containsAll(expectedAggValues) {
 ...
} else {
 ...
}

So it always logged something.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, must've put that in for debugging and forgotten to leave a note. The new mechanism makes sense 👍 If you think recordsProcessed % 10 is the right frequency, rather than recordsProcessed % 100, then perhaps we can remove the comment entirely and just leave it as recordsProcessed % 10.

if (seenValues.containsAll(expectedAggValues)) {
System.out.printf("Table aggregate processor saw expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues));
} else {
System.out.printf("Table aggregate processor did not see expected values. Seen: %s. Expected: %s%n", String.join(",", seenValues), String.join(",", expectedAggValues)); // this line for debugging purposes only.
}
}

return new Agg(seenValues, agg.recordsProcessed + 1);
},
(k, v, agg) -> agg,
Materialized.with(intSerde, new AggSerde()))
.mapValues((k, vAgg) -> String.join(",", vAgg.seenValues))
.toStream();

// adding dummy processor for better debugging (will print assigned tasks)
result.process(SmokeTestUtil.printTaskProcessorSupplier("table-repartition"));
result.to("table-agg-result", Produced.with(intSerde, stringSerde));
}

private static class Agg {
private final List<String> seenValues;
private final long recordsProcessed;

Agg(final List<String> seenValues, final long recordsProcessed) {
this.seenValues = seenValues;
this.recordsProcessed = recordsProcessed;
}

byte[] serialize() {
final byte[] rawSeenValues = stringSerde.serializer().serialize("", String.join(",", seenValues));
final byte[] rawRecordsProcessed = longSerde.serializer().serialize("", recordsProcessed);
return ByteBuffer
.allocate(rawSeenValues.length + rawRecordsProcessed.length)
.put(rawSeenValues)
.put(rawRecordsProcessed)
.array();
}

static Agg deserialize(final byte[] rawAgg) {
final byte[] rawSeenValues = new byte[rawAgg.length - 8];
System.arraycopy(rawAgg, 0, rawSeenValues, 0, rawSeenValues.length);
final List<String> seenValues = Arrays.asList(stringSerde.deserializer().deserialize("", rawSeenValues).split(","));

final byte[] rawRecordsProcessed = new byte[8];
System.arraycopy(rawAgg, rawAgg.length - 8, rawRecordsProcessed, 0, 8);
final long recordsProcessed = longSerde.deserializer().deserialize("", rawRecordsProcessed);

return new Agg(seenValues, recordsProcessed);
}
}

private static class AggSerde implements Serde<Agg> {

@Override
public Serializer<Agg> serializer() {
return (topic, agg) -> agg.serialize();
}

@Override
public Deserializer<Agg> deserializer() {
return (topic, rawAgg) -> Agg.deserialize(rawAgg);
}
}

private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier {
@Override
public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,17 @@ private static class ValueList {
for (int i = 0; i < values.length; i++) {
values[i] = min + i;
}
// We want to randomize the order of data to test not completely predictable processing order
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
shuffle(values, 10);

index = 0;
}

int next() {
return (index < values.length) ? values[index++] : -1;
final int v = values[index++];
if (index >= values.length) {
index = 0;
}
return v;
}
}

Expand All @@ -126,7 +127,7 @@ static void generatePerpetually(final String kafka,
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
}

final Random rand = new Random();
final Random rand = new Random(System.currentTimeMillis());

try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (true) {
Expand Down Expand Up @@ -309,7 +310,7 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio
}

private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
final Random rand = new Random();
final Random rand = new Random(System.currentTimeMillis());
for (int i = 0; i < data.length; i++) {
// we shuffle data within windowSize
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ public class SmokeTestUtil {

final static int END = Integer.MAX_VALUE;

static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) {
return () -> new SmokeTestProcessor(topic);
}

static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, "");
}

static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
return () -> new ContextualProcessor<Object, Object, Void, Void>() {
return () -> new SmokeTestProcessor(topic) {
private int numRecordsProcessed = 0;
private long smallestOffset = Long.MAX_VALUE;
private long largestOffset = Long.MIN_VALUE;

@Override
public void init(final ProcessorContext<Void, Void> context) {
super.init(context);
System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
numRecordsProcessed = 0;
smallestOffset = Long.MAX_VALUE;
largestOffset = Long.MIN_VALUE;
Expand All @@ -74,7 +76,7 @@ public void process(final Record<Object, Object> record) {

@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
super.close();
System.out.println("processed " + numRecordsProcessed + " records");
final long processed;
if (largestOffset >= smallestOffset) {
Expand Down Expand Up @@ -128,4 +130,27 @@ public static void sleep(final long duration) {
} catch (final Exception ignore) { }
}

private static class SmokeTestProcessor extends ContextualProcessor<Object, Object, Void, Void> {
private final String topic;

public SmokeTestProcessor(String topic) {
this.topic = topic;
}

@Override
public void init(final ProcessorContext<Void, Void> context) {
super.init(context);
System.out.println("[3.3] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
}

@Override
public void process(final Record<Object, Object> record) {}

@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
System.out.flush();
}
}
}
Loading