Skip to content

KAFKA-8615: Change to track partition time breaks TimestampExtractor#7054

Merged
guozhangwang merged 13 commits intoapache:trunkfrom
ableegoldman:fix-timestamp-extractor
Jul 18, 2019
Merged

KAFKA-8615: Change to track partition time breaks TimestampExtractor#7054
guozhangwang merged 13 commits intoapache:trunkfrom
ableegoldman:fix-timestamp-extractor

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Jul 9, 2019

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Should be cherry-picked back to 2.1

@ableegoldman
Copy link
Copy Markdown
Member Author

@mjsax mjsax added the streams label Jul 9, 2019
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 9, 2019

Also, I think we need to add back the notion of partition time. Currently, we pass in next() timestamp that is not the previous timestamp (ie, look into the "future" but we should look into the "past")

Comment thread streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java Outdated
Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch @ableegoldman, just one minor comment otherwise lgtm modulo comments from @mjsax.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jul 9, 2019

retest this please

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some follow up comments, based on our in-person discussion today.

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also update method timestamp() to headRecordTimestamp() to be more explicit what it returns? It's orthogonal to the actual fix, but might be a good improvement.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar, can we piggy-back some cleanup to PartitionGroup ?

- rename PartitionGroup#timestamp() to PartitionGroup#streamTime()
(also update the JavaDocs, that seems to be wrong)

- in `clear()` reset `streamTime` to UNKNOWN ?

- in `nextRecord()`: do we need to check if `queue != null` and do we need to check if `record != null` (seem it's ensure that both can never be `null` ?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack to all...except the last point. We do check both for null..?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, we check both for null atm:

        final RecordQueue queue = nonEmptyQueuesByTime.poll();
        info.queue = queue;

        if (queue != null) {
            // get the first record from this queue.
            record = queue.poll();

            if (record != null) {
                --totalBuffered;

But I think they cannot be null, could they?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, misunderstood your question. Yes, either one could potentially be null if we don't yet have new records to process?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. final RecordQueue queue = nonEmptyQueuesByTime.poll(); could return null. However, I am wondering if record = queue.poll(); could return null, because it's called nonEmptyQueuesByTime -- hence, queue should never be empty?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree the second null check should never happen.

@@ -41,8 +41,8 @@
*
* PartitionGroup also maintains a stream-time for the group as a whole.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change "stream-time" to "task-time" ? @vvcephei suggested it (and I like it) in an in-person discussion? If we agree, also the corresponding variable and method names should be updated. Thoughts?

Should it be "stream-time" or "stream time" ?

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support "task-time" -- would be good to distinguish from "stream time" should we ever want/need a "global" stream time. That said, we use stream time all over including things like PunctuationType. Will it be confusing to have maybePuncuateStreamTime() punctuate on something called task-time? Though we already refer to it separately as "partition group timestamp", "stream partition time", AND "stream time" in that method..

Regarding "stream-time" vs "stream time" -- we use the hyphen when referring to types of time semantics (eg event-time) so I'd favor "stream time" to maintain a distinction between "semantics types" and "time definitions" -- WDYT?

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, maybe phrases like "lowest task time" might be ambiguous (what does "lowest task" mean and why do we care about its time?) so I'm good with task-time.

* The PartitionGroup's stream-time is also the stream-time of its task and is used as the
* stream-time for any computations that require it.
* Note however that any computation that depends on stream time tracks it on a per-operator basis to obtain an
* accurate view of the local stream time as seen by that node.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node -> processor? Maybe we could call this "processor time" ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just local stream time -> local time to avoid introducing too many types of time? Or do you feel "processor time" is clear enough (possibly more so than "local time" -- WDYT?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In light of renaming "stream time" -> "task time" I think it does make sense to call this "processor time" and establish a clear naming hierarchy

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems, we check both for null atm:

        final RecordQueue queue = nonEmptyQueuesByTime.poll();
        info.queue = queue;

        if (queue != null) {
            // get the first record from this queue.
            record = queue.poll();

            if (record != null) {
                --totalBuffered;

But I think they cannot be null, could they?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jul 11, 2019

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jul 12, 2019

java 11 scala 2.13 timed out, java 8 failure unrelated

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Jul 15, 2019

java 11 scala 2.12 failed, java 8 failed
java 11 scala 2.13 passed All test results cleaned out already

retest this please

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 16, 2019

Java11/2.12: (https://issues.apache.org/jira/browse/KAFKA-8672)

java.lang.RuntimeException: Could not find enough records. found 33, expected 100
	at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:306)
	at org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testReconfigConnector(RebalanceSourceConnectorsIntegrationTest.java:180)

Java11/2.13 (https://issues.apache.org/jira/browse/KAFKA-8555)

org.apache.kafka.connect.errors.DataException: Insufficient records committed by connector simple-conn in 15000 millis. Records expected=2000, actual=1500
	at org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)
	at org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)

Java8: env error

stderr: fatal: unable to connect to github.com:
13:08:31 github.com: Temporary failure in name resolution

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 16, 2019

Java8 failed again with env error
Java 11/2.13 failed with know ExampleConnectIntegrationTest.testSourceConnector
Java 11/2.12 passed.

Retest this please.

@ableegoldman
Copy link
Copy Markdown
Member Author

Java8 and 11.12 passed, Java11.13 failed with knownkafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic and known kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll

* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long timestamp() {
public long streamTime() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this this taskTime()? (Just a thought).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of that in theory -- but, then do we also rename maybePuncuateStreamTime ? Do we also deprecate PunctuationType.STREAM_TIME in favor of PunctuationType.TASK_TIME? Task time does seem more appropriate but I'm hesitant to mix terminology ...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just keep it as streamTime for now.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @ableegoldman I think we can remove the second null check but I'm okay keeping it as well just to be safe.

* Return the stream-time of this partition group defined as the largest timestamp seen across all partitions
*/
public long timestamp() {
public long streamTime() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just keep it as streamTime for now.

final long timestamp;
try {
timestamp = timestampExtractor.extract(deserialized, timestamp());
timestamp = timestampExtractor.extract(deserialized, partitionTime);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree the second null check should never happen.

@guozhangwang guozhangwang merged commit 62fbc92 into apache:trunk Jul 18, 2019
guozhangwang pushed a commit that referenced this pull request Jul 18, 2019
…7054)

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.3 as well.

@ableegoldman
Copy link
Copy Markdown
Member Author

Thanks @guozhangwang! I think it should actually be cherry-picked all the way back to 2.1

I don't think there should be conflicts but if you need a separate PR for the earlier branches, let me know

guozhangwang pushed a commit that referenced this pull request Jul 18, 2019
…7054)

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
guozhangwang pushed a commit that referenced this pull request Jul 19, 2019
…7054)

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.2 and 2.1.

ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 20, 2019
* apache-github/2.3:
  MINOR: Update documentation for enabling optimizations (apache#7099)
  MINOR: Remove stale streams producer retry default docs. (apache#6844)
  KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085)
  KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054)
  KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094)
  KAFKA-8602: Separate PR for 2.3 branch (apache#7092)
  KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928)
  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086)
  KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050)
  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021)
  HOT FIX: close RocksDB objects in correct order (apache#7076)
  KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070)
  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705)
  Fixes apache#8198 KStreams testing docs use non-existent method pipe (apache#6678)
  KAFKA-5998: fix checkpointableOffsets handling (apache#7030)
  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072)
  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991)
  MINOR: add upgrade text (apache#7013)
  Bump version to 2.3.1-SNAPSHOT
xiowu0 pushed a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
… breaks TimestampExtractor (apache#7054)

TICKET = KAFKA-8615
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [3d7b989]
ORIGINAL_DESCRIPTION =

The timestamp extractor takes a previousTimestamp parameter which should be the partition time. This PR adds back in partition time tracking for the extractor, and renames previousTimestamp --> partitionTime

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Matthias J. Sax <mjsax@apache.org>
(cherry picked from commit 3d7b989)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants