Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void shouldFanoutTheInput() throws Exception {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
KStream<byte[], String> stream2 = stream1.mapValues(
Expand All @@ -119,10 +120,6 @@ public String apply(String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);

//
// Step 2: Produce some input data to the input topic.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception {
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
Expand All @@ -149,10 +150,6 @@ public KeyValue<String, String> apply(String key, String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);

//
// Step 2: Produce some input data to the input topic.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void shouldCountClicksPerRegion() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Explicitly place the state directory under /tmp so that we can remove it via
// `purgeLocalStreamsState` below. Once Streams is updated to expose the effective
// StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
Expand Down Expand Up @@ -217,10 +218,6 @@ public Long apply(Long value1, Long value2) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(10000);

//
// Step 2: Publish user-region information.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void shouldUppercaseTheInput() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC);
KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() {
Expand All @@ -92,10 +93,6 @@ public String apply(String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);

//
// Step 2: Produce some input data to the input topic.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,14 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// Write the input data as-is to the output topic.
builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC);

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);

//
// Step 2: Produce some input data to the input topic.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void shouldCountWords() throws Exception {
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Explicitly place the state directory under /tmp so that we can remove it via
// `purgeLocalStreamsState` below. Once Streams is updated to expose the effective
// StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
Expand Down Expand Up @@ -115,11 +116,7 @@ public KeyValue<String, String> apply(String key, String value) {

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);


//
// Step 2: Produce some input data to the input topic.
//
Expand Down