Skip to content

MINOR: capture result timestamps in Kafka Streams DSL tests#6447

Merged
mjsax merged 2 commits intoapache:trunkfrom
mjsax:minor-dsl-test-cleanup
Mar 20, 2019
Merged

MINOR: capture result timestamps in Kafka Streams DSL tests#6447
mjsax merged 2 commits intoapache:trunkfrom
mjsax:minor-dsl-test-cleanup

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Mar 14, 2019

To enable proper testing of timestamp propagation in the DSL, this PR updates MockProcessor to capture the record timestamp.

All tests using MockProcessor need to be updated accordingly. This PR only sets most timestamps to zero for now. When we update the DSL semantics, those test will gradually be updated accordingly.

Some code cleanup on the side (Java8 rewrites, reformatting, getting rid of warnings.)

@mjsax mjsax added the streams label Mar 14, 2019
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 14, 2019

Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman

private String topicName = "topic";
private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer());
private final ConsumerRecordFactory<Integer, String> recordFactory =
new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer(), 0L);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the initial timestamp of the factory to zero (similar in other tests).

assertEquals(6, supplier.theCapturedProcessor().processed.size());

final String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"};
final String[] expected = {"10:V1 (ts: 0)", "20:V2 (ts: 0)", "21:V2 (ts: 0)", "30:V3 (ts: 0)", "31:V3 (ts: 0)", "32:V3 (ts: 0)"};
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test of the expected result, also checks the timestamp now. Similar in other tests.


try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to set initial wall-clock time of the driver (removed to avoid confusion with event-time). Similar on other places.

}

processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1100)", "1:X1+YY1 (ts: 1100)", "2:X2+YY2 (ts: 1100)", "3:X3+YY3 (ts: 1100)");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does modify timestamps to test window boundaries, thus, result timestamps are not zero.

}

processor.checkAndClearProcessResult("0:X0+YY0");
processor.checkAndClearProcessResult("0:X0+YY0 (ts: 1000)");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

}

processor.checkAndClearProcessResult("0:X0+YY0");
processor.checkAndClearProcessResult("0:X0+YY0 (ts: 900)");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey, time));
}
processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
processor.checkAndClearProcessResult("0:XX0+Y0 (ts: 1100)", "1:XX1+Y1 (ts: 1100)", "2:XX2+Y2 (ts: 1100)", "3:XX3+Y3 (ts: 1100)");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

"[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2",
"[C@5/15]:0+3+3", "[C@10/20]:0+3"
"[A@0/10]:0+1 (ts: 0)",
"[B@0/10]:0+2 (ts: 1)",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test sets timestamps explicitly so we get non-zero timestamps.

processed.add(
(key == null ? "null" : key) +
":" + (value == null ? "null" : value) +
" (ts: " + context().timestamp() + ")"
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change of this PR to capture the result timestamp.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 15, 2019

@mjsax test failures seem related, but from looking at the results the values are the same but it looks like the comparison is asserting instance equality.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 15, 2019

Fixed, by adding timestamps comparison.

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM!

@mjsax mjsax merged commit b5ce093 into apache:trunk Mar 20, 2019
@mjsax mjsax deleted the minor-dsl-test-cleanup branch March 20, 2019 00:27
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* apache/trunk: (23 commits)
  KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (apache#6493)
  KAFKA-8102: Add an interval-based Trogdor transaction generator (apache#6444)
  MINOR: Fix misspelling in protocol documentation
  KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (apache#6489)
  KAFKA-8014: Extend Connect integration tests to add and remove workers dynamically (apache#6342)
  MINOR: Remove line for testing repartition topic name (apache#6488)
  MINOR: add MacOS requirement to Streams docs
  MINOR: fix message protocol help text for ElectPreferredLeadersResult (apache#6479)
  MINOR: list-topics should not require topic param
  MINOR: Clean up ThreadCacheTest (apache#6485)
  MINOR: Avoid unnecessary collection copy in MetadataCache (apache#6397)
  KAFKA-8142: Fix NPE for nulls in Headers (apache#6484)
  KAFKA-7243: Add unit integration tests to validate metrics in Kafka Streams (apache#6080)
  MINOR: Add verification step for Streams archetype to Jenkins build (apache#6431)
  KAFKA-7819: Improve RoundTripWorker (apache#6187)
  KAFKA-7989: RequestQuotaTest should wait for quota config change before running tests (apache#6482)
  KAFKA-8098: Fix Flaky Test testConsumerGroups
  KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (apache#6409)
  MINOR: capture result timestamps in Kafka Streams DSL tests (apache#6447)
  MINOR: updated names for deprecated streams constants (apache#6466)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
)

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants