Skip to content

KAFKA-8863 - Add InsertHeader and DropHeader transforms for connect#7284

Closed
alozano3 wants to merge 24 commits intoapache:trunkfrom
alozano3:KAFKA-8863-connect-insertHeaders-dropHeaders
Closed

KAFKA-8863 - Add InsertHeader and DropHeader transforms for connect#7284
alozano3 wants to merge 24 commits intoapache:trunkfrom
alozano3:KAFKA-8863-connect-insertHeaders-dropHeaders

Conversation

@alozano3
Copy link
Copy Markdown
Contributor

@alozano3 alozano3 commented Sep 3, 2019

The SMTs InsertHeader and DropHeaders described in KIP-145 have been implemented, extending the work done in PR4319.

Unit tests are added for InsertHeader and ConnectHeaders, covering the different types of Header (int8, int16, int32, int64, float32, float64, Time, Date and Timestamp).

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 3, 2019

Inside the method consumeNextToken() of the Values class, the string being parsed is tokenized by the character ":". So when I insert a header of the type Time with the pattern "HH:mm:ss.SSS'Z'" the string gets tokenized instead of remaining as a whole.

The consequence is that the string is treated as different integer values and is never parsed as a whole date inside the method parse(Parser parser, boolean embedded).

As a workaround I insert the value escaped (e.g: "14\\:34\\:22.222Z"), and then I replace the characters "\" by "", because in order to be aprsed as adate, the length of the string is checked.

Could you give some advice on this please??

@alozano3 alozano3 changed the title [WIP] Add InsertHeader and DropHeader transforms for connect KAFKA-8863 - [WIP] Add InsertHeader and DropHeader transforms for connect Sep 4, 2019
@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 4, 2019

retest this please

1 similar comment
@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 5, 2019

retest this please

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 5, 2019

There is a flaky test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated

org.junit.runners.model.TestTimedOutException: test timed out after 120000 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) at java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) at java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) at java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) at app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:158) at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) at app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) at app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) at java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base@11.0.1/java.lang.Thread.run(Thread.java:834)

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 5, 2019

Another flaky test:
org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector

org.apache.kafka.connect.errors.DataException: Insufficient records committed by connector simple-conn in 30000 millis. Records expected=2000, actual=79 at org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:191) at org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:183) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 5, 2019

retest this please

@alozano3
Copy link
Copy Markdown
Contributor Author

@rhauch for review.

@alozano3
Copy link
Copy Markdown
Contributor Author

Should I just insert only string schema to the header instead of trying to parse the value in order to set the proper schema??

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, @alozano3! Very nice work. There are a few behavioral changes that we can make, but overall this is very close.

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented Sep 23, 2019

@rhauch What do you think about the issue with the Time and Timestamp values that I previously posted?
The string gets tokenized in the method _Values.consumeNextToken because it contains the character ':' so it is treated like a key:value field. So I have to put the header value escaping the colons "14\\:34\\:22.222Z" and then delete the escape characters in the Values.parse method:

// Check for a date, time, or timestamp ...
token = token.replace("\\", "");
int tokenLength = token.length();

@alozano3 alozano3 requested a review from rhauch September 25, 2019 06:56
Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @alozano3. Looks great, though I have two pretty easy suggestions below.

@alozano3 alozano3 requested a review from rhauch October 4, 2019 07:03
Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround, @alozano3. A few more minor suggestions.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Oct 4, 2019

Retest this, please.

alozano3 and others added 2 commits October 7, 2019 08:14
…sforms/InsertHeader.java

Co-Authored-By: Randall Hauch <rhauch@gmail.com>
…sforms/InsertHeader.java

Co-Authored-By: Randall Hauch <rhauch@gmail.com>
…sforms/InsertHeader.java

Co-Authored-By: Randall Hauch <rhauch@gmail.com>
@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Oct 11, 2019

@alozano3 any update to my questions/comments?

@alozano3
Copy link
Copy Markdown
Contributor Author

Sure! I have implemented and answered some of your advices but I didn't have much time yesterday to have it all. Hopefully next week, I will go for it.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Oct 11, 2019

@alozano3 we're getting close to code freeze, so the sooner the better.

@alozano3 alozano3 requested review from rhauch and removed request for rhauch February 5, 2020 15:25
Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @alozano3. The SMT implementations look pretty good, but unfortunately any changes to ConfigDef will require a KIP. My recommendation is to avoid making any changes to ConfigDef and keep the validator implementation specific to these SMTs.

Comment thread clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java Outdated
@alozano3
Copy link
Copy Markdown
Contributor Author

Any update?

@alozano3
Copy link
Copy Markdown
Contributor Author

alozano3 commented May 6, 2020

Thanks for the PR, @alozano3. The SMT implementations look pretty good, but unfortunately any changes to ConfigDef will require a KIP. My recommendation is to avoid making any changes to ConfigDef and keep the validator implementation specific to these SMTs.

The change is done. Please review the changes.
Thanks.

@alozano3 alozano3 marked this pull request as draft June 30, 2020 14:41
@alozano3 alozano3 marked this pull request as ready for review June 30, 2020 14:41
@alozano3 alozano3 changed the title KAFKA-8863 - [WIP] Add InsertHeader and DropHeader transforms for connect KAFKA-8863 - Add InsertHeader and DropHeader transforms for connect Apr 12, 2021
@tombentley
Copy link
Copy Markdown
Member

@alozano3 I have PR #9549 open for these SMTs (plus HeaderFrom) and was hoping to merge them this week pending a 2nd pass from @mimaison.

@alozano3
Copy link
Copy Markdown
Contributor Author

@alozano3 I have PR #9549 open for these SMTs (plus HeaderFrom) and was hoping to merge them this week pending a 2nd pass from @mimaison.

Great because I have not received feedback for over a year..

@alozano3
Copy link
Copy Markdown
Contributor Author

I close this PR because #9549 has been merged.

@alozano3 alozano3 closed this Apr 19, 2021
@alozano3 alozano3 deleted the KAFKA-8863-connect-insertHeaders-dropHeaders branch April 19, 2021 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants