MINOR: Pass a streams config to replace the single state dir#4714
Conversation
|
@guozhangwang failures are related, unused import |
|
Yup, I'm fixing. |
|
Triggered system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1471 Succeeded. |
| stateDir.mkdir(); | ||
|
|
||
| final Properties streamsProperties = new Properties(); | ||
| final Properties streamsProperties = Utils.loadProps(propFileName); |
There was a problem hiding this comment.
propFileName could be null, do a null check here?
There was a problem hiding this comment.
I will consider adding the check within the function.
| final Serde<String> stringSerde = Serdes.String(); | ||
|
|
||
| final Properties streamsProperties = new Properties(); | ||
| final Properties streamsProperties = Utils.loadProps(propFileName); |
There was a problem hiding this comment.
same here, check for null propFileName
| final String propFileName = args.length > 1 ? args[1] : null; | ||
| final String command = args.length > 2 ? args[2] : null; | ||
|
|
||
| final Properties streamsProperties = Utils.loadProps(propFileName); |
|
|
||
| final Properties streamsProperties = new Properties(); | ||
| streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); | ||
| final Properties streamsProperties = Utils.loadProps(propFileName); |
|
Thanks @guozhangwang left a few minor comments, otherwise LGTM. One general question and sorry if I missed something, but where do we supply our own properties file or edit the existing one to add additional settings? |
Thanks for the comments. For your question, the file is added here: https://github.com/apache/kafka/pull/4714/files#diff-1dcacbd6daa49466dc7cb89022929dfbR143 that will only encode the state dir for now. In the follow-up PR I will modify simple benchmark python code to allow for other config overridden and remove the hard-coded ones in the java code. Does that make sense? |
…se-property-file-for-system-tests
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the PR. Looks good overall. Couple of minor comments.
Can you also run streams system test before we merge this. Thx.
| props.load(propStream); | ||
|
|
||
| if (filename != null) { | ||
| try (InputStream propStream = new FileInputStream(filename)) { |
There was a problem hiding this comment.
Should we log something for this case?
|
|
||
| final Properties props = Utils.loadProps(propFileName); | ||
|
|
||
| String stateDirStr; |
| benchmark.run(); | ||
| } | ||
|
|
||
| public Properties setStreamProperties(final String applicationId) { |
There was a problem hiding this comment.
should we change the return type to void?
| private boolean uncaughtException = false; | ||
|
|
||
| public SmokeTestClient(File stateDir, String kafka) { | ||
| public SmokeTestClient(Properties streamsProperties, String kafka) { |
|
|
||
| private static KafkaStreams createKafkaStreams(File stateDir, String kafka) { | ||
| final Properties props = new Properties(); | ||
| private static KafkaStreams createKafkaStreams(Properties props, String kafka) { |
| System.out.println("numThreads=" + numThreads); | ||
|
|
||
| SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName, numRecords, numThreads); | ||
| SimpleBenchmark benchmark = new SimpleBenchmark(props, kafka, loadPhase, testName, numRecords, numThreads); |
There was a problem hiding this comment.
should we put kafka and numThread directly into props and reduce number of parameters here (would also simplify all the "passing through" code.
There was a problem hiding this comment.
Yes, in my follow-up PR I've done that, just for its scope I tend to keep its changes more focused.
| String kafka = args[0]; | ||
| String stateDir = args.length > 1 ? args[1] : null; | ||
| String command = args.length > 2 ? args[2] : null; | ||
| public static void main(String[] args) throws InterruptedException, IOException { |
| public class StreamsStandByReplicaTest { | ||
|
|
||
| public static void main(String[] args) { | ||
| public static void main(String[] args) throws IOException { |
Yeah I have triggered a couple streams system test, the first successful one is https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1471 |
mjsax
left a comment
There was a problem hiding this comment.
LGTM.
Is there already a follow up PR that address the pending comments?
|
I'll create the follow-up PR once I rebased this one, there are a sequence of them on my local repo :) |
This is a general change and is re-requisite to allow streams benchmark test with different streams tests. For the streams benchmark itself I will have a separate PR for switching configs. Details:
Committer Checklist (excluded from commit message)