Skip to content

KAFKA-8065: restore original input record timestamp in forward()#6393

Merged
mjsax merged 1 commit intoapache:trunkfrom
mjsax:minor-fix-timestamp-forwarding
Mar 9, 2019
Merged

KAFKA-8065: restore original input record timestamp in forward()#6393
mjsax merged 1 commit intoapache:trunkfrom
mjsax:minor-fix-timestamp-forwarding

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Mar 7, 2019

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Mar 7, 2019
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 7, 2019

Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman

driver.pipeInput(recordFactory.create(INPUT_TOPIC_1, "key1", "value1", 10L));
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 10L);
assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1", partition, 20L);
assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition, 15L);
Copy link
Copy Markdown
Member Author

@mjsax mjsax Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, this fails, because output record ts is 25.

And the following timestamps below would be: 35, 37, 47 (it's accumulating also for "deep" modifications)

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @mjsax, LGTM.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 7, 2019

Java11 failed with some environmental issue.

Retest this please

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @mjsax . I just had one question.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 8, 2019

Java 8 passed, Java 11 failed with org.apache.kafka.connect.runtime.WorkerTest.testConverterOverrides

retest this please

forward(child, key, value);
}
} finally {
recordContext.timestamp = currentTimestamp;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider saving/restoring the whole record context, just in case downstream processors have changed other parts of the context, not just the timestamp?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Headers work differently. Users would mutate the input record header so they understand that the header changes.

We can still address it (I agree that the current header API is not great) but I would exclude it from this PR. Feel free to create a ticket for it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a good point. I was thinking that downstream processors were directly manipulating the record context, but if they do that, it's clearly their responsibility to restore it when they're done.

I think I won't bother with a ticket right now, it seems like the mutable-headers thing is actually just part of the key and value also being mutable. Plus, there's a bunch of other stuff that's not well defined about header handling in Streams. It seems like altogether too much to scope in a ticket without investing some serious design work up front.

I think the current change is fine as-is. Thanks!

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 8, 2019

Jenkins env failure on Java11. Java8 passed.

Retest this please.

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Mar 8, 2019

LGTM. I'm 👍

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@mjsax mjsax merged commit 09f1009 into apache:trunk Mar 9, 2019
@mjsax mjsax deleted the minor-fix-timestamp-forwarding branch March 9, 2019 03:24
mjsax added a commit that referenced this pull request Mar 9, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Mar 9, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
mjsax added a commit that referenced this pull request Mar 9, 2019
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 9, 2019

Merged to trunk and cherry-picked to 2.2, 2.1, and 2.0.

jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* warn-apache-kafka/trunk: (41 commits)
  MINOR: Avoid double null check in KStream#transform() (apache#6429)
  KAFKA-7944: Improve Suppress test coverage (apache#6382)
  KAFKA-3522: add missing guards for TimestampedXxxStore (apache#6356)
  MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (apache#6309)
  KAFKA-7976; Update config before notifying controller of unclean leader update (apache#6426)
  KAFKA-7801: TopicCommand should not be able to alter transaction topic partition count
  KAFKA-8091; Wait for processor shutdown before testing removed listeners (apache#6425)
  MINOR: Update delete topics zk path in assertion error messages
  KAFKA-7939: Fix timing issue in KafkaAdminClientTest.testCreateTopicsRetryBackoff
  KAFKA-7922: Return authorized operations in Metadata request response (KIP-430 Part-2)
  MINOR: Print usage when parse fails during console producer
  MINOR: fix Scala compiler warning (apache#6417)
  KAFKA-7288; Fix check in SelectorTest to wait for no buffered bytes (apache#6415)
  KAFKA-8065: restore original input record timestamp in forward() (apache#6393)
  MINOR: cleanup deprectaion annotations (apache#6290)
  KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (apache#6173)
  KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp (apache#6401)
  KAFKA-8070: Increase consumer startup timeout in system tests (apache#6405)
  KAFKA-8040: Streams handle initTransactions timeout (apache#6372)
  KAFKA-7980 - Fix timing issue in SocketServerTest.testConnectionRateLimit (apache#6391)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#6393)

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants