Skip to content

Conversation

@Sanil15
Copy link
Contributor

@Sanil15 Sanil15 commented Sep 20, 2018

  • The default system is a required config for intermediate streams, and since no user will write assertions against them, defaulting it makes it easier for the user to write test
  • To support stateful jobs using Table API descriptors we need to disable host affinity, which is enabled by table API by default
  • @vjagadish pointed out addConfigs vs addOverrideConfig to be a confusing user-facing API. We now support only addConfig with different signatures, this configs takes precedence over any descriptor or TestRunner generated configs

…pport TableDescriptors and refining addConfig method for TestRunner API
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be addConfig as well? Users might be setting a default system descriptor in their application themselves.

Copy link
Contributor Author

@Sanil15 Sanil15 Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can add here

* @param config for the application
* @return this {@link TestRunner}
*/
private TestRunner putIfAbsentConfig(Map<String, String> config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need a helper method for a one line method, can inline this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

descriptor.toConfig().forEach(configs::putIfAbsent); Java complains Incomptaible types cannot convert object to string

so two lines have to be added
Map<String, String> d = descriptor.toConfig();
d.forEach(configs::putIfAbsent)

Hence I made a method to avoid doing this all the time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sanil15 this is related to the type-erasure of generic types. If you declare descriptor as InMemoryInputDescriptor<?>, the one liner would work. Otherwise, since the descriptor is of an erased type InMemoryInputDescriptor, Java will complain about type conversion failure in compile time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

/**
* Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
* @param config configs for the application
* Adds a config to Samza application, this config takes precedence over default configs and descriptor generated configs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use period instead of comma for unrelated sentences. "Adds a ... application. This config ..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

public TestRunner addOverrideConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
public TestRunner addConfig(Config config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should take a Map<String, String> instead of Config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Preconditions.checkNotNull(config);
String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME);
configs.put(String.format("%s%s", configKeyPrefix, key), value);
config.forEach((key, value) -> configs.put(String.format("%s%s", configKeyPrefix, key), value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can delegate to addConfig instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting up the patch. A few minor comments.

* @param config for the application
* @return this {@link TestRunner}
*/
private TestRunner putIfAbsentConfig(Map<String, String> config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sanil15 this is related to the type-erasure of generic types. If you declare descriptor as InMemoryInputDescriptor<?>, the one liner would work. Otherwise, since the descriptor is of an erased type InMemoryInputDescriptor, Java will complain about type conversion failure in compile time.

.createStream(spec);
addConfigs(streamDescriptor.toConfig());
addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
putIfAbsentConfig(streamDescriptor.toConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add to the Javadoc of this method to explain why we are calling putIfAbsent here? I assume the reason is to allow user supplied config to supersede descriptor generated config? Can we make it clear?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

imsd.withInMemoryScope(this.inMemoryScope);
addConfigs(descriptor.toConfig());
addConfigs(descriptor.getSystemDescriptor().toConfig());
putIfAbsentConfig(descriptor.toConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

.addInputStream(imid, inputList)
.addOutputStream(imod, 1)
.addOverrideConfig("job.container.thread.pool.size", "4")
.addConfig("job.container.thread.pool.size", "4")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Nice!

*
*/
public class TestRunner {
public static final String JOB_DEFAULT_SYSTEM = "default-samza-system";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the visibility of both the static constants to private. These two constants are not used outside of TestRunner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default job name is used in InMemorySystemDescriptor for jobs. prefix override

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of job name in InMemorySystemDescriptor is an anti-pattern that we should not encourage. I have already reverted that in PR #642 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted the anti-pattern, made the constants private

configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
// This is important because Table Api enables host affinity by default for RocksDb
addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be jobs.job_name.ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED. Otherwise, it won't override the tableDescriptor generated configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the method addConfig(...), its adds that prefix

}

/**
* Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This java doc comment is confusing. Might be good to rename to the following(or something better):

Adds new configuration from {@param config} to samza application configuration {@code configs}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary now I removed it

imsd.withInMemoryScope(this.inMemoryScope);
addConfigs(descriptor.toConfig());
addConfigs(descriptor.getSystemDescriptor().toConfig());
putIfAbsentConfig(descriptor.toConfig());
Copy link
Contributor

@shanthoosh shanthoosh Sep 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putIfAbsentConfig API here is entirely unnecessary. You had to do this, since we didn't use the StreamMessageType as a type for InMemoryInputDescriptor in the API signature(rather used a raw type). Rather than the following(at line 369):

 private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
       Map<Integer, Iterable<StreamMessageType>> partitonData) {	       Map<Integer, Iterable<StreamMessageType>> partitonData) {

We can do the following :

 private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor<? extends StreeamMessageType> descriptor,
       Map<Integer, Iterable<StreamMessageType>> partitonData) {	       Map<Integer, Iterable<StreamMessageType>> partitonData) {

Since we haven't explicitly defined the type for genericly parameterized class, type-erasure kicks in and all the methods from InMemoryInputDescriptor return response of object type. If you do the above, then you don't have to do explicit typecast in all the places(For instance, at line 372 from Object to String).

After the above API signature change putifAbsentConfig API will be unnecessary, and could be simply in-lined it with the following:

    descriptor.toConfig().forEach(this.configs::putIfAbsent);

If it does makes sense, then please change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

public TestRunner addConfig(String key, String value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to PR #642 , we need to have the full jobNameAndId in the config job prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that

.createStream(spec);
addConfigs(streamDescriptor.toConfig());
addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
streamDescriptor.toConfig().forEach(this.configs::putIfAbsent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: add a comment here to explain why we use putIfAbsent, instead of overwriting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the confix prefix, added a comment to docs

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thanks!

@asfgit asfgit closed this in 094ff16 Oct 1, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants