Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -29,6 +31,7 @@
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;


/**
Expand Down Expand Up @@ -97,12 +100,44 @@ public static String randomString(int len) {
}

/**
* Creates an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
* Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
* suffix to generate its name.
*/
public static File tempFile() throws IOException {
File file = File.createTempFile("kafka", ".tmp");
file.deleteOnExit();

return file;
}

/**
* Create a temporary relative directory in the default temporary-file directory with the given prefix.
*
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
*/
public static File tempDirectory(String prefix) throws IOException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update the Scala TestUtils to call this method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

return tempDirectory(null, prefix);
}

/**
* Create a temporary relative directory in the specified parent directory with the given prefix.
*
* @param parent The parent folder path name, if null using the default temporary-file directory
* @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix
*/
public static File tempDirectory(Path parent, String prefix) throws IOException {
final File file = parent == null ?
Files.createTempDirectory(prefix == null ? "kafka-" : prefix).toFile() :
Files.createTempDirectory(parent, prefix == null ? "kafka-" : prefix).toFile();
file.deleteOnExit();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Utils.delete(file);
}
});

return file;
}

Expand Down
9 changes: 1 addition & 8 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,8 @@ object TestUtils extends Logging {
def tempRelativeDir(parent: String): File = {
val parentFile = new File(parent)
parentFile.mkdirs()
val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile
f.deleteOnExit()

Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
Utils.delete(f)
}
})
f
org.apache.kafka.test.TestUtils.tempDirectory(parentFile.toPath, "kafka-");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not be better to use kafka- as the prefix by default in o.a.k.t.TestUtils.tempDirectory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Changing kafka to kafka-.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,23 @@
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class KStreamBuilderTest {

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems to be repeated in a lot of tests. Is there a reason why we don't introduce a superclass for initialisation and clean-up of KStreamTestDriver?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also make KStreamTestDriver a JUnit ExternalResource so that the shutdown is done automatically.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExternalResource: +1 (assuming it'll work that way)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma @miguno I thought about that. But one tricky thing is that KStreamTestDriver usually needs to be constructed within the test case since its construction parameters are dynamic, and hence cannot define it outside the test case. Suggestions welcomed.

Copy link
Copy Markdown
Contributor

@miguno miguno Apr 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: Ah, I think you mean that we need to pass builder to the constructor of KStreamTestDriver, which (in this case, unfortunately) is being mutated by subsequent calls such as builder.stream() or stream.branch() before we pass it to the driver constructor?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps @dguy has an idea whether it's possible to use KStreamTestDriver as an ExternalResource even though, for constructing an instance of the former, we need to pass a test case specific parameter (here: builder aka KStreamBuilder instance) to it for each test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but you'd need to make a few changes to KStreamTestDriver , i.e., everything currently in the constructor would need to be moved into a method that can be called from the tests. So you'd end up having an overloaded initiliaze method instead of the various constructors.


@Test(expected = TopologyBuilderException.class)
public void testFrom() {
final KStreamBuilder builder = new KStreamBuilder();
Expand Down Expand Up @@ -66,7 +77,7 @@ public void testMerge() {
MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
merged.process(processorSupplier);

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
driver.setTime(0L);

driver.process(topic1, "A", "aa");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;

Expand All @@ -43,12 +41,7 @@ public class KGroupedTableImplTest {

@Before
public void setUp() throws IOException {
stateDir = Files.createTempDirectory("test").toFile();
}

@After
public void tearDown() throws IOException {
Utils.delete(stateDir);
stateDir = TestUtils.tempDirectory("kafka-test");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.junit.Test;

import java.lang.reflect.Array;
Expand All @@ -33,6 +34,16 @@ public class KStreamBranchTest {

private String topicName = "topic";

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}

@SuppressWarnings("unchecked")
@Test
public void testKStreamBranch() {
Expand Down Expand Up @@ -74,7 +85,7 @@ public boolean test(Integer key, String value) {
branches[i].process(processors[i]);
}

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;

import org.junit.After;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand All @@ -32,6 +33,16 @@ public class KStreamFilterTest {

private String topicName = "topic";

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}

private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
Expand All @@ -51,7 +62,7 @@ public void testFilter() {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filter(isMultipleOfThree).process(processor);

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
Expand All @@ -71,7 +82,7 @@ public void testFilterNot() {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filterNot(isMultipleOfThree).process(processor);

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand All @@ -34,6 +35,16 @@ public class KStreamFlatMapTest {

private String topicName = "topic";

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}

@Test
public void testFlatMap() {
KStreamBuilder builder = new KStreamBuilder();
Expand All @@ -59,7 +70,7 @@ public Iterable<KeyValue<String, String>> apply(Integer key, String value) {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMap(mapper).process(processor);

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -34,6 +35,16 @@ public class KStreamFlatMapValuesTest {

private String topicName = "topic";

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}

@Test
public void testFlatMapValues() {
KStreamBuilder builder = new KStreamBuilder();
Expand All @@ -58,7 +69,7 @@ public Iterable<String> apply(String value) {
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMapValues(mapper).process(processor);

KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.test.KStreamTestDriver;
import org.junit.After;
import org.junit.Test;
import java.util.List;
import java.util.Locale;
Expand All @@ -39,6 +40,16 @@ public class KStreamForeachTest {
final private Serde<Integer> intSerde = Serdes.Integer();
final private Serde<String> stringSerde = Serdes.String();

private KStreamTestDriver driver = null;

@After
public void cleanup() {
if (driver != null) {
driver.close();
}
driver = null;
}

@Test
public void testForeach() {
// Given
Expand Down Expand Up @@ -71,7 +82,7 @@ public void apply(Integer key, String value) {
stream.foreach(action);

// Then
KStreamTestDriver driver = new KStreamTestDriver(builder);
driver = new KStreamTestDriver(builder);
for (KeyValue<Integer, String> record: inputRecords) {
driver.process(topicName, record.key, record.value);
}
Expand Down
Loading