From 5bbacddbcad86d28f29e093cf0873fe05d82918a Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Thu, 20 Sep 2018 16:33:52 -0700 Subject: [PATCH 1/7] Adding default job system in TestRunner, disabling hostaffinity to support TableDescriptors and refining addConfig method for TestRunner API --- .../samza/test/framework/TestRunner.java | 60 +++++++++++++------ .../AsyncStreamTaskIntegrationTest.java | 2 +- .../StreamApplicationIntegrationTest.java | 23 ------- .../framework/StreamTaskIntegrationTest.java | 4 +- .../table/TestLocalTableWithSideInputs.java | 8 +-- 5 files changed, 45 insertions(+), 52 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 5c4ba3bc16..716cadb2d9 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -34,6 +34,7 @@ import org.apache.samza.application.SamzaApplication; import org.apache.samza.application.StreamApplication; import org.apache.samza.application.TaskApplication; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; @@ -79,10 +80,13 @@ *
  • "task.name.grouper.factory" = {@link SingleContainerGrouperFactory}
  • *
  • "job.name" = "test-samza"
  • *
  • "processor.id" = "1"
  • + *
  • "job.default.system" = {@code JOB_DEFAULT_SYSTEM}
  • + *
  • "job.host-affinity.enabled" = "false"
  • * * */ public class TestRunner { + public static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; public static final String JOB_NAME = "samza-test"; private Map configs; @@ -102,11 +106,15 @@ private TestRunner() { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); + // This is important because Table Api enables host affinity by default for RocksDb + addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); + putIfAbsentConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).toConfig()); } /** * Constructs a new {@link TestRunner} from following components - * @param taskClass represent a class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} + * @param taskClass class containing Samza job logic extending either {@link StreamTask} or {@link AsyncStreamTask} */ private TestRunner(Class taskClass) { this(); @@ -115,7 +123,7 @@ private TestRunner(Class taskClass) { this.taskClass = taskClass; } - /** + /**TestLocalTableWithSideInputs * Constructs a new {@link TestRunner} from following components * @param app samza job implementing {@link StreamApplication} */ @@ -148,28 +156,31 @@ public static TestRunner of(StreamApplication app) { } /** - * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. - * @param config configs for the application + * Adds a config to Samza application, this config takes precedence over default configs and descriptor generated configs + * + * @param key of the config + * @param value of the config * @return this {@link TestRunner} */ - public TestRunner addConfigs(Map config) { - Preconditions.checkNotNull(config); - config.forEach(this.configs::putIfAbsent); + public TestRunner addConfig(String key, String value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME); + configs.put(String.format("%s%s", configKeyPrefix, key), value); return this; } /** - * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already - * exisiting in {@code configs} - * @param key key of the config - * @param value value of the config + * Adds {@code config} to Samza application, these configs takes precedence over default configs + * and descriptor generated configs + * + * @param config configuration for samza application * @return this {@link TestRunner} */ - public TestRunner addOverrideConfig(String key, String value) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(value); + public TestRunner addConfig(Config config) { + Preconditions.checkNotNull(config); String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME); - configs.put(String.format("%s%s", configKeyPrefix, key), value); + config.forEach((key, value) -> configs.put(String.format("%s%s", configKeyPrefix, key), value)); return this; } @@ -229,8 +240,8 @@ public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int factory .getAdmin(streamDescriptor.getSystemName(), config) .createStream(spec); - addConfigs(streamDescriptor.toConfig()); - addConfigs(streamDescriptor.getSystemDescriptor().toConfig()); + putIfAbsentConfig(streamDescriptor.toConfig()); + putIfAbsentConfig(streamDescriptor.getSystemDescriptor().toConfig()); return this; } @@ -366,8 +377,8 @@ private void initializeInMemoryInputStream(InMemoryInputDesc } InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); - addConfigs(descriptor.toConfig()); - addConfigs(descriptor.getSystemDescriptor().toConfig()); + putIfAbsentConfig(descriptor.toConfig()); + putIfAbsentConfig(descriptor.getSystemDescriptor().toConfig()); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); @@ -384,4 +395,15 @@ private void initializeInMemoryInputStream(InMemoryInputDesc new EndOfStreamMessage(null))); }); } + + /** + * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. + * @param config for the application + * @return this {@link TestRunner} + */ + private TestRunner putIfAbsentConfig(Map config) { + Preconditions.checkNotNull(config); + config.forEach(this.configs::putIfAbsent); + return this; + } } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index 581b1c33c2..8d7e9d2522 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -123,7 +123,7 @@ public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { .of(MyAsyncStreamTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("task.max.concurrency", "4") + .addConfig("task.max.concurrency", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 6dd91590eb..4ebe95f3b7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -96,7 +96,6 @@ public void testHighLevelApi() throws Exception { .of(pageViewRepartition) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1500)); Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); @@ -108,27 +107,6 @@ public static > MapFunction create() { } } - /** - * Job should fail since it is missing config "job.default.system" for partitionBy Operator - */ - @Test(expected = SamzaException.class) - public void testSamzaJobStartMissingConfigFailureForStreamApplication() { - - InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - - InMemoryInputDescriptor imid = isd - .getInputDescriptor("PageView", new NoOpSerde()); - - InMemoryOutputDescriptor imod = isd - .getOutputDescriptor("Output", new NoOpSerde()); - - TestRunner - .of(pageViewRepartition) - .addInputStream(imid, new ArrayList<>()) - .addOutputStream(imod, 10) - .run(Duration.ofMillis(1000)); - } - /** * Null page key is passed in input data which should fail filter logic */ @@ -154,7 +132,6 @@ public void testSamzaJobFailureForStreamApplication() { TestRunner.of(pageViewFilter) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1000)); } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index 0580598652..2360d1c979 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -102,7 +102,7 @@ public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception { .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(1)); StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000)); @@ -149,7 +149,7 @@ public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { .of(MyStreamTestTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 3c22818919..9866c6f722 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -30,10 +30,8 @@ import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.IntegerSerde; @@ -85,7 +83,6 @@ private void runTest(String systemName, StreamApplication app, List pa configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); - configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName); @@ -103,8 +100,7 @@ private void runTest(String systemName, StreamApplication app, List pa .addInputStream(pageViewStreamDesc, pageViews) .addInputStream(profileStreamDesc, profiles) .addOutputStream(outputStreamDesc, 1) - .addConfigs(new MapConfig(configs)) - .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()) + .addConfig(new MapConfig(configs)) .run(Duration.ofMillis(100000)); try { @@ -148,7 +144,6 @@ public void describe(StreamApplicationDescriptor appDesc) { .withSideInputsProcessor((msg, store) -> { Profile profile = (Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); } @@ -162,7 +157,6 @@ static class DurablePageViewProfileJoin extends PageViewProfileJoin { .withSideInputsProcessor((msg, store) -> { TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); } From 57386250cfcd22699db49dab43a19d54208b685f Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Thu, 20 Sep 2018 16:52:06 -0700 Subject: [PATCH 2/7] Correcting a typo --- .../main/java/org/apache/samza/test/framework/TestRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 716cadb2d9..23c379459b 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -123,7 +123,7 @@ private TestRunner(Class taskClass) { this.taskClass = taskClass; } - /**TestLocalTableWithSideInputs + /** * Constructs a new {@link TestRunner} from following components * @param app samza job implementing {@link StreamApplication} */ From 0641242647c14db2b53e0a6bae7efd393ae101ee Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Fri, 21 Sep 2018 12:33:29 -0700 Subject: [PATCH 3/7] Addressing Review --- .../org/apache/samza/test/framework/TestRunner.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 23c379459b..a128af4032 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -36,6 +36,7 @@ import org.apache.samza.application.TaskApplication; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -109,6 +110,7 @@ private TestRunner() { configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); // This is important because Table Api enables host affinity by default for RocksDb addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); + addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); putIfAbsentConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).toConfig()); } @@ -156,7 +158,7 @@ public static TestRunner of(StreamApplication app) { } /** - * Adds a config to Samza application, this config takes precedence over default configs and descriptor generated configs + * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs * * @param key of the config * @param value of the config @@ -171,16 +173,15 @@ public TestRunner addConfig(String key, String value) { } /** - * Adds {@code config} to Samza application, these configs takes precedence over default configs + * Adds {@code config} to Samza application. These configs takes precedence over default configs * and descriptor generated configs * * @param config configuration for samza application * @return this {@link TestRunner} */ - public TestRunner addConfig(Config config) { + public TestRunner addConfig(Map config) { Preconditions.checkNotNull(config); - String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME); - config.forEach((key, value) -> configs.put(String.format("%s%s", configKeyPrefix, key), value)); + config.forEach((key, value) -> addConfig(key, value)); return this; } From 3c1167b9850f6d5f5296903303625fbdb10e99d2 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Fri, 21 Sep 2018 12:37:02 -0700 Subject: [PATCH 4/7] Addressing a review --- .../main/java/org/apache/samza/test/framework/TestRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index a128af4032..d3b06f60e3 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -107,7 +107,7 @@ private TestRunner() { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); - configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); + addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); // This is important because Table Api enables host affinity by default for RocksDb addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); From f44a472bbfaf78a590aa142bcae0bb9a0279b251 Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Tue, 25 Sep 2018 12:43:07 -0700 Subject: [PATCH 5/7] Adressing review --- .../samza/test/framework/TestRunner.java | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index d3b06f60e3..e272918358 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -111,7 +111,7 @@ private TestRunner() { // This is important because Table Api enables host affinity by default for RocksDb addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); - putIfAbsentConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).toConfig()); + new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).toConfig().forEach(this.configs::putIfAbsent); } /** @@ -205,7 +205,8 @@ public TestRunner addInputStream(InMemoryInputDescriptor des } /** - * Adds the provided input stream with mock data to the test application. + * Adds the provided input stream with mock data to the test application. Default configs and user added configs have + * a higher precedence over system and stream descriptor generated configs. * @param descriptor describes the stream that is supposed to be input to Samza application * @param messages map whose key is partitionId and value is messages in the partition * @param message with null key or a KV {@link org.apache.samza.operators.KV}. @@ -223,12 +224,13 @@ public TestRunner addInputStream(InMemoryInputDescriptor des } /** - * Adds the provided output stream to the test application. + * Adds the provided output stream to the test application. Default configs and user added configs have a higher + * precedence over system and stream descriptor generated configs. * @param streamDescriptor describes the stream that is supposed to be output for the Samza application * @param partitionCount partition count of output stream * @return this {@link TestRunner} */ - public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { + public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { Preconditions.checkNotNull(streamDescriptor); Preconditions.checkState(partitionCount >= 1); InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor(); @@ -241,8 +243,8 @@ public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int factory .getAdmin(streamDescriptor.getSystemName(), config) .createStream(spec); - putIfAbsentConfig(streamDescriptor.toConfig()); - putIfAbsentConfig(streamDescriptor.getSystemDescriptor().toConfig()); + streamDescriptor.toConfig().forEach(this.configs::putIfAbsent); + ((Map) streamDescriptor.getSystemDescriptor().toConfig()).forEach(this.configs::putIfAbsent); return this; } @@ -366,7 +368,7 @@ private TaskFactory createTaskFactory() { * messages in the partition * @param descriptor describes a stream to initialize with the in memory system */ - private void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, + private void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, Map> partitonData) { String systemName = descriptor.getSystemName(); String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId()); @@ -378,8 +380,8 @@ private void initializeInMemoryInputStream(InMemoryInputDesc } InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); - putIfAbsentConfig(descriptor.toConfig()); - putIfAbsentConfig(descriptor.getSystemDescriptor().toConfig()); + descriptor.toConfig().forEach(this.configs::putIfAbsent); + ((Map) descriptor.getSystemDescriptor().toConfig()).forEach(this.configs::putIfAbsent); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); @@ -396,15 +398,4 @@ private void initializeInMemoryInputStream(InMemoryInputDesc new EndOfStreamMessage(null))); }); } - - /** - * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. - * @param config for the application - * @return this {@link TestRunner} - */ - private TestRunner putIfAbsentConfig(Map config) { - Preconditions.checkNotNull(config); - config.forEach(this.configs::putIfAbsent); - return this; - } } From 142a3f870bf7550daf3ee4e2b1768d4b75db0ade Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Fri, 28 Sep 2018 12:32:44 -0700 Subject: [PATCH 6/7] Changing visibility of static variables --- .../main/java/org/apache/samza/test/framework/TestRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 6c4a553eb4..4b7e2ed5ab 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -84,8 +84,8 @@ * */ public class TestRunner { - public static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; - public static final String JOB_NAME = "samza-test"; + private static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; + private static final String JOB_NAME = "samza-test"; private Map configs; private SamzaApplication app; From f5c58213e40458a45d3481fba1c9c0668c7ff82f Mon Sep 17 00:00:00 2001 From: Sanil15 Date: Sun, 30 Sep 2018 21:17:50 -0700 Subject: [PATCH 7/7] Fixing LocalTable with side inputs test --- .../samza/test/table/TestLocalTableWithSideInputs.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 9866c6f722..814ad92cbd 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -62,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness @Test public void testJoinWithSideInputsTable() { runTest( - "side-input-join", + "test", new PageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(10)), Arrays.asList(TestTableData.generateProfiles(10))); @@ -71,7 +71,7 @@ public void testJoinWithSideInputsTable() { @Test public void testJoinWithDurableSideInputTable() { runTest( - "durable-side-input", + "test", new DurablePageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(5)), Arrays.asList(TestTableData.generateProfiles(5))); @@ -131,7 +131,7 @@ static class PageViewProfileJoin implements StreamApplication { public void describe(StreamApplicationDescriptor appDesc) { Table> table = appDesc.getTable(getTableDescriptor()); KafkaSystemDescriptor sd = - new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); + new KafkaSystemDescriptor("test"); appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde())) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") .join(table, new PageViewToProfileJoinFunction())