Skip to content

MINOR: fix time discrepancy between TestInputTopic and TopologyTestDriver that creates it#17702

Merged
ableegoldman merged 1 commit intoapache:trunkfrom
ableegoldman:TTD-match-TestInputTopic-start-timestamp-with-driver
Nov 9, 2024
Merged

MINOR: fix time discrepancy between TestInputTopic and TopologyTestDriver that creates it#17702
ableegoldman merged 1 commit intoapache:trunkfrom
ableegoldman:TTD-match-TestInputTopic-start-timestamp-with-driver

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

@github-actions github-actions Bot added streams small Small PRs labels Nov 6, 2024
* @throws TaskMigratedException
*/
protected void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
public void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to the main fix, just doing some cleanup of the TTD on the side. I don't see why we can't just make this method public since it's an internal class anyways -- which was the exact reasoning I saw on the PR which originally changed it from private to protected anyway. But I can leave this part out if there are any concerns

}
}

private static class TestDriverProducer extends StreamsProducer {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class isn't doing anything, right? I'm unclear on its purpose and figured I might as well clean it up from the TTD -- but again, happy to undo this change since it's not needed for the main fix

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... seems fine to me

}
}

private static class TestDriverProducer extends StreamsProducer {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class isn't doing anything, right? I'm unclear on its purpose and figured I might as well clean it up from the TTD -- but again, happy to undo this change since it's not needed for the main fix

topicName,
keySerializer,
valueSerializer,
Instant.ofEpochMilli(mockWallClockTime.milliseconds()),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix: instead of initializing to the current time, we should initialize to the driver's current time. I feel like this makes more sense anyway...we mock time in the driver so why should the TestInputTopic use a view of time based on actual wallclock time rather than the driver's time?

And would this need a KIP?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it needs a KIP

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree this doesn't need a KIP, plus there's an overload where users can provide their own time if needed. Although, you make a great point about synchronizing the time with the driver.

@@ -738,7 +737,14 @@ private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String topic
public final <K, V> TestInputTopic<K, V> createInputTopic(final String topicName,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wondering why all these methods are final -- I could have addressed this issue in our code without this PR if I could just override these methods. Are we concerned about users doing this..? And why?

I figure this change actually would need a KIP so I left it out, but I'd like to remove the final from these unless someone knows a good reason for doing this

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, you'd need a KIP for removing it. I'm fine with merging this fix without removing the final.

For removing the final -- it does make sense to restrict the surface area for extending the public interface. I'm not sure why we'd need to override those. But this is something for the KIP discussion anyways, if you want to write one

Copy link
Copy Markdown
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@ableegoldman ableegoldman force-pushed the TTD-match-TestInputTopic-start-timestamp-with-driver branch from 608b029 to 2eeb6ab Compare November 7, 2024 04:25
@ableegoldman ableegoldman merged commit 32d9dec into apache:trunk Nov 9, 2024
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>
eduwercamacaro added a commit to littlehorse-enterprises/kafka that referenced this pull request Dec 17, 2024
* KAFKA-17926 Improve the documentation explaining why max.in.flight.requests.per.connection should not exceed 5 (apache#17719)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Cleanup GroupCoordinatorRecordHelpers (apache#17718)

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>

* rebase to fix merge conflict (apache#17702)

Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>

* KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns (apache#17676)

Reviewers: Jun Rao <junrao@gmail.com>

* MINOR: log fix in SnapshottableCoordinator (apache#17726)

Reviewers: donaldzhu-cc, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17570 Rewrite StressTestLog by Java (apache#17249)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Delete unused member from KafkaAdminClient (apache#17729)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17970 Moving some share purgatory classes from core to share module (apache#17722)

As part of PR: apache#17636 where purgatory has been moved from core to server-common hence move some existing classes used in Share Fetch to Share module.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17837 Rewrite DeleteTopicTest (apache#17579)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* MINOR: Move StopPartition to server-common (apache#17704)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-15549 Bump swagger dependency version from 2.2.8 to 2.2.25 (apache#17730)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17872: Update consumed offsets on records with invalid timestamp (apache#17710)

TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.

Reviewers: Bill Bejeck <bill@confluent.io>

* KAFKA-17925 Convert Kafka Client integration tests to use KRaft (apache#17670)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-17779: Fix flaky RemoteLogManager test (apache#17724)

Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

* KAFKA-17455: fixes stuck producer by polling again after pollDelayMs in NetworkUtils#awaitReady()

* clarifies comments

* attempts to add test

* Adds a test but my changes to MockClient.java broke all sorts of stuff

* test that passes on my branch and fails on trunk

* addresses PR feedback: rename MockClient#setAdvanceTimeDuringPoll to advanceTimeDuringPoll()

---------

Co-authored-by: PoAn Yang <payang@apache.org>
Co-authored-by: David Jacot <djacot@confluent.io>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Co-authored-by: Jeff Kim <kimkb2011@gmail.com>
Co-authored-by: TengYao Chi <kitingiao@gmail.com>
Co-authored-by: Apoorv Mittal <apoorvmittal10@gmail.com>
Co-authored-by: Mickael Maison <mimaison@users.noreply.github.com>
Co-authored-by: Yung <yungyung7654321@gmail.com>
Co-authored-by: Matthias J. Sax <matthias@confluent.io>
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Co-authored-by: wperlichek <61857706+wperlichek@users.noreply.github.com>
Co-authored-by: Colt McNealy <colt@littlehorse.io>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.

The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.

In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.

Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants