Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.apache.samza.SamzaException;
import org.apache.samza.application.LegacyTaskApplication;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
Expand Down Expand Up @@ -74,11 +76,14 @@
* <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
* <li>"job.name" = "test-samza"</li>
* <li>"processor.id" = "1"</li>
* <li>"job.default.system" = {@code JOB_DEFAULT_SYSTEM}</li>
* <li>"job.host-affinity.enabled" = "false"</li>
* </ol>
*
*/
public class TestRunner {
public static final String JOB_NAME = "samza-test";
private static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
private static final String JOB_NAME = "samza-test";

private Map<String, String> configs;
private SamzaApplication app;
Expand All @@ -96,6 +101,11 @@ private TestRunner() {
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
// This is important because Table Api enables host affinity by default for RocksDb
addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be jobs.job_name.ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED. Otherwise, it won't override the tableDescriptor generated configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the method addConfig(...), its adds that prefix

addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
}

/**
Expand Down Expand Up @@ -142,13 +152,17 @@ public static TestRunner of(SamzaApplication app) {
}

/**
* Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
* @param config configs for the application
* Adds a config to Samza application. This config takes precedence over default configs and descriptor generated configs
*
* @param key of the config
* @param value of the config
* @return this {@link TestRunner}
*/
public TestRunner addConfigs(Map<String, String> config) {
Preconditions.checkNotNull(config);
config.forEach(this.configs::putIfAbsent);
public TestRunner addConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
configs.put(String.format("%s%s", configPrefix, key), value);
return this;
}

Expand All @@ -157,24 +171,10 @@ public TestRunner addConfigs(Map<String, String> config) {
* @param config configs for the application
* @return this {@link TestRunner}
*/
public TestRunner addConfigs(Map<String, String> config, String configPrefix) {
public TestRunner addConfig(Map<String, String> config) {
Preconditions.checkNotNull(config);
config.forEach((key, value) -> this.configs.putIfAbsent(String.format("%s%s", configPrefix, key), value));
return this;
}

/**
* Adds a config to {@code configs} if its not already present. Overrides a config value for which key is already
* exisiting in {@code configs}
* @param key key of the config
* @param value value of the config
* @return this {@link TestRunner}
*/
public TestRunner addOverrideConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
String configKeyPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
configs.put(String.format("%s%s", configKeyPrefix, key), value);
String configPrefix = String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId());
config.forEach((key, value) -> this.configs.put(String.format("%s%s", configPrefix, key), value));
return this;
}

Expand Down Expand Up @@ -202,7 +202,8 @@ private String getJobNameAndId() {
}

/**
* Adds the provided input stream with mock data to the test application.
* Adds the provided input stream with mock data to the test application. Default configs and user added configs have
* a higher precedence over system and stream descriptor generated configs.
* @param descriptor describes the stream that is supposed to be input to Samza application
* @param messages map whose key is partitionId and value is messages in the partition
* @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
Expand All @@ -220,12 +221,13 @@ public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor des
}

/**
* Adds the provided output stream to the test application.
* Adds the provided output stream to the test application. Default configs and user added configs have a higher
* precedence over system and stream descriptor generated configs.
* @param streamDescriptor describes the stream that is supposed to be output for the Samza application
* @param partitionCount partition count of output stream
* @return this {@link TestRunner}
*/
public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) {
public TestRunner addOutputStream(InMemoryOutputDescriptor<?> streamDescriptor, int partitionCount) {
Preconditions.checkNotNull(streamDescriptor);
Preconditions.checkState(partitionCount >= 1);
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor();
Expand All @@ -238,8 +240,8 @@ public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int
factory
.getAdmin(streamDescriptor.getSystemName(), config)
.createStream(spec);
addConfigs(streamDescriptor.toConfig());
addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
addConfig(streamDescriptor.toConfig());
addConfig(streamDescriptor.getSystemDescriptor().toConfig());
return this;
}

Expand Down Expand Up @@ -340,7 +342,7 @@ public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeS
* messages in the partition
* @param descriptor describes a stream to initialize with the in memory system
*/
private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<?> descriptor,
Map<Integer, Iterable<StreamMessageType>> partitonData) {
String systemName = descriptor.getSystemName();
String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
Expand All @@ -352,8 +354,8 @@ private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDesc
}
InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
imsd.withInMemoryScope(this.inMemoryScope);
addConfigs(descriptor.toConfig());
addConfigs(descriptor.getSystemDescriptor().toConfig(), String.format(JobConfig.CONFIG_OVERRIDE_JOBS_PREFIX(), getJobNameAndId()));
addConfig(descriptor.toConfig());
addConfig(descriptor.getSystemDescriptor().toConfig());
StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
SystemFactory factory = new InMemorySystemFactory();
Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception {
.of(MyAsyncStreamTask.class)
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
.addOverrideConfig("task.max.concurrency", "4")
.addConfig("task.max.concurrency", "4")
.run(Duration.ofSeconds(2));

StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public void testHighLevelApi() throws Exception {
.of(pageViewRepartition)
.addInputStream(imid, pageviews)
.addOutputStream(imod, 10)
.addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1500));

Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
Expand All @@ -108,27 +107,6 @@ public static <K, V, M extends KV<K, V>> MapFunction<M, V> create() {
}
}

/**
* Job should fail since it is missing config "job.default.system" for partitionBy Operator
*/
@Test(expected = SamzaException.class)
public void testSamzaJobStartMissingConfigFailureForStreamApplication() {

InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");

InMemoryInputDescriptor<PageView> imid = isd
.getInputDescriptor("PageView", new NoOpSerde<PageView>());

InMemoryOutputDescriptor<PageView> imod = isd
.getOutputDescriptor("Output", new NoOpSerde<PageView>());

TestRunner
.of(pageViewRepartition)
.addInputStream(imid, new ArrayList<>())
.addOutputStream(imod, 10)
.run(Duration.ofMillis(1000));
}

/**
* Null page key is passed in input data which should fail filter logic
*/
Expand All @@ -154,7 +132,6 @@ public void testSamzaJobFailureForStreamApplication() {
TestRunner.of(pageViewFilter)
.addInputStream(imid, pageviews)
.addOutputStream(imod, 10)
.addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1000));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testSyncTaskWithSinglePartitionMultithreaded() throws Exception {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
.addOverrideConfig("job.container.thread.pool.size", "4")
.addConfig("job.container.thread.pool.size", "4")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice!

.run(Duration.ofSeconds(1));

StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
Expand Down Expand Up @@ -149,7 +149,7 @@ public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception {
.of(MyStreamTestTask.class)
.addInputStream(imid, inputPartitionData)
.addOutputStream(imod, 5)
.addOverrideConfig("job.container.thread.pool.size", "4")
.addConfig("job.container.thread.pool.size", "4")
.run(Duration.ofSeconds(2));

StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.IntegerSerde;
Expand Down Expand Up @@ -64,7 +62,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
@Test
public void testJoinWithSideInputsTable() {
runTest(
"side-input-join",
"test",
new PageViewProfileJoin(),
Arrays.asList(TestTableData.generatePageViews(10)),
Arrays.asList(TestTableData.generateProfiles(10)));
Expand All @@ -73,7 +71,7 @@ public void testJoinWithSideInputsTable() {
@Test
public void testJoinWithDurableSideInputTable() {
runTest(
"durable-side-input",
"test",
new DurablePageViewProfileJoin(),
Arrays.asList(TestTableData.generatePageViews(5)),
Arrays.asList(TestTableData.generateProfiles(5)));
Expand All @@ -85,7 +83,6 @@ private void runTest(String systemName, StreamApplication app, List<PageView> pa
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);

InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);

Expand All @@ -103,8 +100,7 @@ private void runTest(String systemName, StreamApplication app, List<PageView> pa
.addInputStream(pageViewStreamDesc, pageViews)
.addInputStream(profileStreamDesc, profiles)
.addOutputStream(outputStreamDesc, 1)
.addConfigs(new MapConfig(configs))
.addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
.addConfig(new MapConfig(configs))
.run(Duration.ofMillis(100000));

try {
Expand Down Expand Up @@ -135,7 +131,7 @@ static class PageViewProfileJoin implements StreamApplication {
public void describe(StreamApplicationDescriptor appDesc) {
Table<KV<Integer, TestTableData.Profile>> table = appDesc.getTable(getTableDescriptor());
KafkaSystemDescriptor sd =
new KafkaSystemDescriptor(appDesc.getConfig().get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM)));
new KafkaSystemDescriptor("test");
appDesc.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
.partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view")
.join(table, new PageViewToProfileJoinFunction())
Expand All @@ -148,7 +144,6 @@ public void describe(StreamApplicationDescriptor appDesc) {
.withSideInputsProcessor((msg, store) -> {
Profile profile = (Profile) msg.getMessage();
int key = profile.getMemberId();

return ImmutableList.of(new Entry<>(key, profile));
});
}
Expand All @@ -162,7 +157,6 @@ static class DurablePageViewProfileJoin extends PageViewProfileJoin {
.withSideInputsProcessor((msg, store) -> {
TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
int key = profile.getMemberId();

return ImmutableList.of(new Entry<>(key, profile));
});
}
Expand Down