diff --git a/kafka-streams/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala b/kafka-streams/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala index f1051c6a..1b037133 100644 --- a/kafka-streams/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala +++ b/kafka-streams/src/test/scala/io/confluent/examples/streams/GenericAvroScalaIntegrationTest.scala @@ -37,19 +37,24 @@ import org.scalatest.junit.AssertionsForJUnit */ class GenericAvroScalaIntegrationTest extends AssertionsForJUnit { - private val privateCluster: EmbeddedSingleNodeKafkaCluster = new EmbeddedSingleNodeKafkaCluster - - @Rule def cluster: EmbeddedSingleNodeKafkaCluster = privateCluster - + private var cluster: EmbeddedSingleNodeKafkaCluster = _ + private val inputTopic = "inputTopic" private val outputTopic = "output-topic" @Before def startKafkaCluster() { + cluster = new EmbeddedSingleNodeKafkaCluster + cluster.start() cluster.createTopic(inputTopic, 2, 1) cluster.createTopic(outputTopic) } + @After + def stopKafkaCluster(): Unit = { + cluster.stop() + } + @Test def shouldRoundTripGenericAvroDataThroughKafka() { val schema: Schema = new Schema.Parser().parse(getClass.getResourceAsStream("/avro/io/confluent/examples/streams/wikifeed.avsc"))