Skip to content

MINOR: Caching layer should forward record timestamp#5423

Merged
mjsax merged 2 commits intoapache:trunkfrom
mjsax:minor-caching-window-timestamp
Jul 26, 2018
Merged

MINOR: Caching layer should forward record timestamp#5423
mjsax merged 2 commits intoapache:trunkfrom
mjsax:minor-caching-window-timestamp

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jul 25, 2018

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Jul 25, 2018
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 25, 2018

\cc @guozhangwang @bbejeck

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test for this case?

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 26, 2018

@guozhangwang Done. Extended existing unit test

Copy link
Copy Markdown
Member Author

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contains couple of Java8 rewrite and code cleanup (tried to mark accordingly with comments)

};
reducer = (value1, value2) -> value1 + ":" + value2;
initializer = () -> 0;
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

return keyComparison == 0 ? o1.value.compareTo(o2.value) : keyComparison;
}
};
Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

return KStreamAggregationIntegrationTest.compare(o1, o2);
}
});
Collections.sort(results, KStreamAggregationIntegrationTest::compare);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

new IntegerDeserializer(),
String.class,
15,
true);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add timestamps here

}
.foreach((key, value) -> {
results.put(key, value);
latch.countDown();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
final TestCondition valuesRead = () -> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

readRecords(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
final TestCondition valuesRead = () -> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
return false;
}
TestUtils.waitForCondition(() -> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java8 rewrite only

@mjsax mjsax merged commit 42af41d into apache:trunk Jul 26, 2018
@mjsax mjsax deleted the minor-caching-window-timestamp branch July 26, 2018 16:31
mjsax added a commit that referenced this pull request Jul 26, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
mjsax added a commit to mjsax/kafka that referenced this pull request Jul 26, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jul 26, 2018

Merged to trunk and cherry-picked to 2.0. PR for 1.1: #5426

guozhangwang pushed a commit that referenced this pull request Jul 26, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Aug 1, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Aug 1, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Aug 1, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Aug 1, 2018
Reviewer: Guozhang Wang <guozhang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants