diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index fe8581b7b7..a1103dd962 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -33,7 +33,9 @@ import org.apache.samza.SamzaException; import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.application.SamzaApplication; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.InMemorySystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; @@ -74,11 +76,14 @@ *
  • "task.name.grouper.factory" = {@link SingleContainerGrouperFactory}
  • *
  • "job.name" = "test-samza"
  • *
  • "processor.id" = "1"
  • + *
  • "job.default.system" = {@code JOB_DEFAULT_SYSTEM}
  • + *
  • "job.host-affinity.enabled" = "false"
  • * * */ public class TestRunner { - public static final String JOB_NAME = "samza-test"; + private static final String JOB_DEFAULT_SYSTEM = "default-samza-system"; + private static final String JOB_NAME = "samza-test"; private Map configs; private SamzaApplication app; @@ -96,6 +101,11 @@ private TestRunner() { configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName()); configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName()); configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName()); + addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); + // This is important because Table Api enables host affinity by default for RocksDb + addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); + addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); + addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig()); } /** @@ -142,13 +152,17 @@ public static TestRunner of(SamzaApplication app) { } /** - * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it. - * @param config configs for the application + * Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs + * + * @param key of the config + * @param value of the config * @return this {@link TestRunner} */ - public TestRunner addConfigs(Map config) { - Preconditions.checkNotNull(config); - config.forEach(this.configs::putIfAbsent); + public TestRunner addConfig(String key, String value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); + configs.put(String.format("%s%s", configPrefix, key), value); return this; } @@ -157,24 +171,10 @@ public TestRunner addConfigs(Map config) { * @param config configs for the application * @return this {@link TestRunner} */ - public TestRunner addConfigs(Map config, String configPrefix) { + public TestRunner addConfig(Map config) { Preconditions.checkNotNull(config); - config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value)); - return this; - } - - /** - * Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already - * exisiting in {@code configs} - * @param key key of the config - * @param value value of the config - * @return this {@link TestRunner} - */ - public TestRunner addOverrideConfig(String key, String value) { - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(value); - String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); - configs.put(String.format("%s%s", configKeyPrefix, key), value); + String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()); + config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value)); return this; } @@ -202,7 +202,8 @@ private String getJobNameAndId() { } /** - * Adds the provided input stream with mock data to the test application. + * Adds the provided input stream with mock data to the test application. Default configs and user added configs have + * a higher precedence over system and stream descriptor generated configs. * @param descriptor describes the stream that is supposed to be input to Samza application * @param messages map whose key is partitionId and value is messages in the partition * @param message with null key or a KV {@link org.apache.samza.operators.KV}. @@ -220,12 +221,13 @@ public TestRunner addInputStream(InMemoryInputDescriptor des } /** - * Adds the provided output stream to the test application. + * Adds the provided output stream to the test application. Default configs and user added configs have a higher + * precedence over system and stream descriptor generated configs. * @param streamDescriptor describes the stream that is supposed to be output for the Samza application * @param partitionCount partition count of output stream * @return this {@link TestRunner} */ - public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { + public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) { Preconditions.checkNotNull(streamDescriptor); Preconditions.checkState(partitionCount >= 1); InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor(); @@ -238,8 +240,8 @@ public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int factory .getAdmin(streamDescriptor.getSystemName(), config) .createStream(spec); - addConfigs(streamDescriptor.toConfig()); - addConfigs(streamDescriptor.getSystemDescriptor().toConfig()); + addConfig(streamDescriptor.toConfig()); + addConfig(streamDescriptor.getSystemDescriptor().toConfig()); return this; } @@ -340,7 +342,7 @@ public static Map> consumeS * messages in the partition * @param descriptor describes a stream to initialize with the in memory system */ - private void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, + private void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor, Map> partitonData) { String systemName = descriptor.getSystemName(); String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId()); @@ -352,8 +354,8 @@ private void initializeInMemoryInputStream(InMemoryInputDesc } InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor(); imsd.withInMemoryScope(this.inMemoryScope); - addConfigs(descriptor.toConfig()); - addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId())); + addConfig(descriptor.toConfig()); + addConfig(descriptor.getSystemDescriptor().toConfig()); StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size()); SystemFactory factory = new InMemorySystemFactory(); Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig()); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java index ef9508a20f..f1757abd25 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java @@ -123,7 +123,7 @@ public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception { .of(MyAsyncStreamTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("task.max.concurrency", "4") + .addConfig("task.max.concurrency", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index 6dd91590eb..4ebe95f3b7 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -96,7 +96,6 @@ public void testHighLevelApi() throws Exception { .of(pageViewRepartition) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1500)); Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1); @@ -108,27 +107,6 @@ public static > MapFunction create() { } } - /** - * Job should fail since it is missing config "job.default.system" for partitionBy Operator - */ - @Test(expected = SamzaException.class) - public void testSamzaJobStartMissingConfigFailureForStreamApplication() { - - InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test"); - - InMemoryInputDescriptor imid = isd - .getInputDescriptor("PageView", new NoOpSerde()); - - InMemoryOutputDescriptor imod = isd - .getOutputDescriptor("Output", new NoOpSerde()); - - TestRunner - .of(pageViewRepartition) - .addInputStream(imid, new ArrayList<>()) - .addOutputStream(imod, 10) - .run(Duration.ofMillis(1000)); - } - /** * Null page key is passed in input data which should fail filter logic */ @@ -154,7 +132,6 @@ public void testSamzaJobFailureForStreamApplication() { TestRunner.of(pageViewFilter) .addInputStream(imid, pageviews) .addOutputStream(imod, 10) - .addOverrideConfig("job.default.system", "test") .run(Duration.ofMillis(1000)); } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java index bc5cba7144..55021d302a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java @@ -102,7 +102,7 @@ public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception { .of(MyStreamTestTask.class) .addInputStream(imid, inputList) .addOutputStream(imod, 1) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(1)); StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000)); @@ -149,7 +149,7 @@ public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception { .of(MyStreamTestTask.class) .addInputStream(imid, inputPartitionData) .addOutputStream(imod, 5) - .addOverrideConfig("job.container.thread.pool.size", "4") + .addConfig("job.container.thread.pool.size", "4") .run(Duration.ofSeconds(2)); StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000)); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 3c22818919..814ad92cbd 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -30,10 +30,8 @@ import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplicationDescriptor; import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.IntegerSerde; @@ -64,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness @Test public void testJoinWithSideInputsTable() { runTest( - "side-input-join", + "test", new PageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(10)), Arrays.asList(TestTableData.generateProfiles(10))); @@ -73,7 +71,7 @@ public void testJoinWithSideInputsTable() { @Test public void testJoinWithDurableSideInputTable() { runTest( - "durable-side-input", + "test", new DurablePageViewProfileJoin(), Arrays.asList(TestTableData.generatePageViews(5)), Arrays.asList(TestTableData.generateProfiles(5))); @@ -85,7 +83,6 @@ private void runTest(String systemName, StreamApplication app, List pa configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName); configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName); - configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName); InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName); @@ -103,8 +100,7 @@ private void runTest(String systemName, StreamApplication app, List pa .addInputStream(pageViewStreamDesc, pageViews) .addInputStream(profileStreamDesc, profiles) .addOutputStream(outputStreamDesc, 1) - .addConfigs(new MapConfig(configs)) - .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()) + .addConfig(new MapConfig(configs)) .run(Duration.ofMillis(100000)); try { @@ -135,7 +131,7 @@ static class PageViewProfileJoin implements StreamApplication { public void describe(StreamApplicationDescriptor appDesc) { Table> table = appDesc.getTable(getTableDescriptor()); KafkaSystemDescriptor sd = - new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM))); + new KafkaSystemDescriptor("test"); appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde())) .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view") .join(table, new PageViewToProfileJoinFunction()) @@ -148,7 +144,6 @@ public void describe(StreamApplicationDescriptor appDesc) { .withSideInputsProcessor((msg, store) -> { Profile profile = (Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); } @@ -162,7 +157,6 @@ static class DurablePageViewProfileJoin extends PageViewProfileJoin { .withSideInputsProcessor((msg, store) -> { TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage(); int key = profile.getMemberId(); - return ImmutableList.of(new Entry<>(key, profile)); }); }