From ddd9c233f9b26f57e24d3ec6bc52094d12a46cdd Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Thu, 20 Sep 2018 18:44:48 -0700 Subject: [PATCH 1/4] Cleaning up TestRunner Apis to incorporate LegacyTaskApplication and StreamApplication --- .../samza/test/framework/TestRunner.java | 67 ++++--------------- .../AsyncStreamTaskIntegrationTest.java | 11 +-- .../framework/StreamTaskIntegrationTest.java | 11 +-- 3 files changed, 24 insertions(+), 65 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 5c4ba3bc16..5e5b8c7b7c 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -31,9 +31,8 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.samza.SamzaException; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.application.TaskApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -56,17 +55,11 @@ import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.inmemory.InMemorySystemFactory; -import org.apache.samza.task.AsyncStreamTask; -import org.apache.samza.task.AsyncStreamTaskFactory; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.StreamTaskFactory; -import org.apache.samza.task.TaskFactory; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; import org.apache.samza.test.framework.system.InMemoryOutputDescriptor; import org.apache.samza.test.framework.system.InMemorySystemDescriptor; import org.junit.Assert; - /** * TestRunner provides APIs to set up integration tests for a Samza application. * Running mode for test is Single container mode @@ -87,7 +80,7 @@ public class TestRunner { private Map configs; private Class taskClass; - private StreamApplication app; + private SamzaApplication app; /* * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor} * provides an isolated state to run with in memory system @@ -106,43 +99,30 @@ private TestRunner() { /** * Constructs a new {@link TestRunner} from following components - * @param taskClass represent a class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} - */ - private TestRunner(Class taskClass) { - this(); - Preconditions.checkNotNull(taskClass); - configs.put(TaskConfig.TASK_CLASS(), taskClass.getName()); - this.taskClass = taskClass; - } - - /** - * Constructs a new {@link TestRunner} from following components - * @param app samza job implementing {@link StreamApplication} + * @param app samza job implementing {@link SamzaApplication} */ - private TestRunner(StreamApplication app) { + private TestRunner(SamzaApplication app) { this(); Preconditions.checkNotNull(app); this.app = app; } /** - * Creates an instance of {@link TestRunner} for Low Level Samza Api - * @param taskClass samza job extending either {@link StreamTask} or {@link AsyncStreamTask} + * Creates an instance of {@link TestRunner} for Legacy Samza Api + * @param taskApp legacy samza task application * @return this {@link TestRunner} */ - public static TestRunner of(Class taskClass) { - Preconditions.checkNotNull(taskClass); - Preconditions.checkState( - StreamTask.class.isAssignableFrom(taskClass) || AsyncStreamTask.class.isAssignableFrom(taskClass)); - return new TestRunner(taskClass); + public static TestRunner of(LegacyTaskApplication taskApp) { + Preconditions.checkNotNull(taskApp); + return new TestRunner(taskApp); } /** * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api - * @param app samza job implementing {@link StreamApplication} + * @param app samza job implementing {@link SamzaApplication} * @return this {@link TestRunner} */ - public static TestRunner of(StreamApplication app) { + public static TestRunner of(SamzaApplication app) { Preconditions.checkNotNull(app); return new TestRunner(app); } @@ -246,8 +226,7 @@ public void run(Duration timeout) { Preconditions.checkState((app == null && taskClass != null) || (app != null && taskClass == null), "TestRunner should run for Low Level Task api or High Level Application Api"); Preconditions.checkState(!timeout.isZero() || !timeout.isNegative(), "Timeouts should be positive"); - SamzaApplication testApp = app == null ? (TaskApplication) appDesc -> appDesc.setTaskFactory(createTaskFactory()) : app; - final LocalApplicationRunner runner = new LocalApplicationRunner(testApp, new MapConfig(configs)); + final LocalApplicationRunner runner = new LocalApplicationRunner(app, new MapConfig(configs)); runner.run(); boolean timedOut = !runner.waitForFinish(timeout); Assert.assertFalse("Timed out waiting for application to finish", timedOut); @@ -326,28 +305,6 @@ public static Map> consumeS entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList()))); } - private TaskFactory createTaskFactory() { - if (StreamTask.class.isAssignableFrom(taskClass)) { - return (StreamTaskFactory) () -> { - try { - return (StreamTask) taskClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new SamzaException(String.format("Failed to instantiate StreamTask class %s", taskClass.getName()), e); - } - }; - } else if (AsyncStreamTask.class.isAssignableFrom(taskClass)) { - return (AsyncStreamTaskFactory) () -> { - try { - return (AsyncStreamTask) taskClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new SamzaException(String.format("Failed to instantiate AsyncStreamTask class %s", taskClass.getName()), e); - } - }; - } - throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask " - + "or AsyncStreamTask", taskClass.getName())); - } - /** * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages * @param partitonData key of the map represents partitionId and value represents diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index 581b1c33c2..919e9fe54b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.operators.KV; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; @@ -52,7 +53,7 @@ public void testAsyncTaskWithSinglePartition() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(MyAsyncStreamTask.class) + .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); @@ -75,7 +76,7 @@ public void testAsyncTaskWithSinglePartitionUsingStreamAssert() throws Exception .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(MyAsyncStreamTask.class) + .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); @@ -97,7 +98,7 @@ public void testAsyncTaskWithMultiplePartition() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(MyAsyncStreamTask.class) + .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); @@ -120,7 +121,7 @@ public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(MyAsyncStreamTask.class) + .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .addOverrideConfig("task.max.concurrency", "4") @@ -156,7 +157,7 @@ public void testSamzaJobTimeoutFailureForAsyncTask() { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(MyAsyncStreamTask.class) + .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) .addInputStream(imid, Arrays.asList(1, 2, 3, 4)) .addOutputStream(imod, 1) .run(Duration.ofMillis(1)); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 0580598652..bf9eb92ed3 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.samza.SamzaException; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.operators.KV; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; @@ -53,7 +54,7 @@ public void testSyncTaskWithSinglePartition() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(MyStreamTestTask.class) + .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(1)); @@ -79,7 +80,7 @@ public void testSamzaJobFailureForSyncTask() { .getOutputDescriptor("output", new NoOpSerde<>()); TestRunner - .of(MyStreamTestTask.class) + .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(1)); @@ -99,7 +100,7 @@ public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(MyStreamTestTask.class) + .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .addOverrideConfig("job.container.thread.pool.size", "4") @@ -123,7 +124,7 @@ public void testSyncTaskWithMultiplePartition() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(MyStreamTestTask.class) + .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); @@ -146,7 +147,7 @@ public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(MyStreamTestTask.class) + .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .addOverrideConfig("job.container.thread.pool.size", "4") From fbb9354cb174a4d75d18b63805f9a1f628de5674 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Fri, 21 Sep 2018 13:29:42 -0700 Subject: [PATCH 2/4] Addressing review --- .../java/org/apache/samza/test/framework/TestRunner.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 5e5b8c7b7c..e2338dc04c 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -99,7 +99,7 @@ private TestRunner() { /** * Constructs a new {@link TestRunner} from following components - * @param app samza job implementing {@link SamzaApplication} + * @param app a {@link SamzaApplication} */ private TestRunner(SamzaApplication app) { this(); @@ -109,7 +109,7 @@ private TestRunner(SamzaApplication app) { /** * Creates an instance of {@link TestRunner} for Legacy Samza Api - * @param taskApp legacy samza task application + * @param taskApp a {@link LegacyTaskApplication} * @return this {@link TestRunner} */ public static TestRunner of(LegacyTaskApplication taskApp) { @@ -118,8 +118,8 @@ public static TestRunner of(LegacyTaskApplication taskApp) { } /** - * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api - * @param app samza job implementing {@link SamzaApplication} + * Creates an instance of {@link TestRunner} for a {@link SamzaApplication} + * @param app a {@link SamzaApplication} * @return this {@link TestRunner} */ public static TestRunner of(SamzaApplication app) { From 792f575a7844df1bd2ea14813cda113ce70fc859 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Thu, 27 Sep 2018 16:56:18 -0700 Subject: [PATCH 3/4] Reverting changes --- .../framework/AsyncStreamTaskIntegrationTest.java | 13 +++++-------- .../test/framework/StreamTaskIntegrationTest.java | 13 ++++++------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index 919e9fe54b..ef9508a20f 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.operators.KV; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; @@ -53,7 +52,7 @@ public void testAsyncTaskWithSinglePartition() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) + .of(MyAsyncStreamTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); @@ -76,7 +75,7 @@ public void testAsyncTaskWithSinglePartitionUsingStreamAssert() throws Exception .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) + .of(MyAsyncStreamTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(2)); @@ -98,7 +97,7 @@ public void testAsyncTaskWithMultiplePartition() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) + .of(MyAsyncStreamTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); @@ -121,7 +120,7 @@ public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) + .of(MyAsyncStreamTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .addOverrideConfig("task.max.concurrency", "4") @@ -157,11 +156,9 @@ public void testSamzaJobTimeoutFailureForAsyncTask() { .getOutputDescriptor("ints-out", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyAsyncStreamTask.class.getName())) + .of(MyAsyncStreamTask.class) .addInputStream(imid, Arrays.asList(1, 2, 3, 4)) .addOutputStream(imod, 1) .run(Duration.ofMillis(1)); } - - } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index bf9eb92ed3..bc5cba7144 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.samza.SamzaException; -import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.operators.KV; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.test.framework.system.InMemoryInputDescriptor; @@ -54,7 +53,7 @@ public void testSyncTaskWithSinglePartition() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) + .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(1)); @@ -80,7 +79,7 @@ public void testSamzaJobFailureForSyncTask() { .getOutputDescriptor("output", new NoOpSerde<>()); TestRunner - .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) + .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .run(Duration.ofSeconds(1)); @@ -100,7 +99,7 @@ public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) + .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) .addOverrideConfig("job.container.thread.pool.size", "4") @@ -124,7 +123,7 @@ public void testSyncTaskWithMultiplePartition() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) + .of(MyStreamTestTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .run(Duration.ofSeconds(2)); @@ -147,7 +146,7 @@ public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { .getOutputDescriptor("output", new NoOpSerde()); TestRunner - .of(new LegacyTaskApplication(MyStreamTestTask.class.getName())) + .of(MyStreamTestTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) .addOverrideConfig("job.container.thread.pool.size", "4") @@ -168,4 +167,4 @@ public void genData(Map> inputPartitionData, Map(outputPartition)); } } -} +} \ No newline at end of file From f0ad565b0d623c70d8807d794efe29f5e7224a89 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Fri, 28 Sep 2018 12:43:13 -0700 Subject: [PATCH 4/4] Reverting the reverted changes --- .../main/java/org/apache/samza/test/framework/TestRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index fda8f559ad..fe8581b7b7 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -100,7 +100,7 @@ private TestRunner() { /** * Constructs a new {@link TestRunner} from following components - * @param taskClass represent a class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} + * @param taskClass containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} */ private TestRunner(Class taskClass) { this(); @@ -111,7 +111,7 @@ private TestRunner(Class taskClass) { /** * Constructs a new {@link TestRunner} from following components - * @param app samza job implementing {@link SamzaApplication} + * @param app a {@link SamzaApplication} */ private TestRunner(SamzaApplication app) { this();