MINOR: Update usage of deprecated API#6146
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei |
| } finally { | ||
| if (kafkaAdminClient != null) { | ||
| kafkaAdminClient.close(60, TimeUnit.SECONDS); | ||
| kafkaAdminClient.close(java.time.Duration.ofSeconds(60)); |
There was a problem hiding this comment.
We cannot import java.time.Duration because we already import javax.xml.datatype.Duration.
I was wondering if we could get rid of javax.xml.datatype.Duration though and replace it with java.time.Duration that was not available in Java7 when we added javax.xml.datatype.Duration.
There was a problem hiding this comment.
The only place we need this duration is
duration.negate().addTo(now);
java.time.Duration has negated but its addTo is taking a Temporal, and all its extends like LocalTime etc do not have the right API of getTime. So I think we just keep it like this.
There was a problem hiding this comment.
We should remove the javax.xml.datatype.Duration usage since it's not part of the Java base module.
There was a problem hiding this comment.
I was locking into the corresponding KIPs
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application
Both say:
--by-duration PnDTnHnMnS
Duration must be specified in ISO8601 format.
Looking into JavaDocs of java.time.Duration#parse() it says:
* Obtains a {@code Duration} from a text string such as {@code PnDTnHnMn.nS}.
* <p>
* This will parse a textual representation of a duration, including the
* string produced by {@code toString()}. The formats accepted are based
* on the ISO-8601 duration format {@code PnDTnHnMn.nS} with days
* considered to be exactly 24 hours.
Thus, it seems save to replace
final javax.xml.datatype.Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration);
with
final java.time.Duration durationParsed = java.time.Duration.parse(duration);
Btw: we should do the same for kafka.admin.ConsumerGroupCommand (I can add this change to this PR).
Thoughts?
There was a problem hiding this comment.
@mjsax LGTM
Both kafka/tools/StreamsResetter.java and kafka/admin/ConsumerGroupCommand.scala should be migrated to remove usage of javax.xml.datatype.*
There was a problem hiding this comment.
@mjsax , I agree, it seems safe and desirable to swap them out.
|
Updated this. Can you please review again @guozhangwang |
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. |
There was a problem hiding this comment.
Just shorting some too long lines here -- not content change.
| final String duration = options.valueOf(byDurationOption); | ||
| final Duration durationParsed = DatatypeFactory.newInstance().newDuration(duration); | ||
| resetByDuration(client, inputTopicPartitions, durationParsed); | ||
| resetByDuration(client, inputTopicPartitions, Duration.parse(duration)); |
There was a problem hiding this comment.
This is the actual update to move to java.time.Duration
| final long timestamp = now.getTime(); | ||
| final Instant now = Instant.now(); | ||
| duration.negated().addTo(now); | ||
| final long timestamp = now.toEpochMilli(); |
| val durationParsed = Duration.parse(duration) | ||
| val now = Instant.now() | ||
| durationParsed.negated().addTo(now) | ||
| val timestamp = now.toEpochMilli |
There was a problem hiding this comment.
Is this code covered by any tests? Duration and Instant are immutable, so I don't think this code would work.
What do you think about:
| val timestamp = now.toEpochMilli | |
| val timestamp = now.minus(durationParsed).toEpochMilli() |
(also applies to the same change below)
There was a problem hiding this comment.
Is this code covered by any tests?
Yes. Jenkins failed (I did not run the tests locally before I pushed...) -- Same issue in StreamsResetter.
|
retest this |
|
LGTM modulo @vvcephei 's comment. Please feel free to merge after comments addressed and jenkins passed |
|
Updates this. |
|
Retest this please |
* ak/trunk: MINOR: Update usage of deprecated API (apache#6146) KAFKA-4217: Add KStream.flatTransform (apache#5273) MINOR: Update Gradle to 5.1.1 (apache#6160) KAFKA-3522: Generalize Segments (apache#6170) Added quotes around the class path (apache#4469) KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (apache#6202) MINOR: In the MetadataResponse schema, ignorable should be a boolean KAFKA-7838: Log leader and follower end offsets when shrinking ISR (apache#6168) KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (apache#3848) MINOR: clarify why suppress can sometimes drop tombstones (apache#6195) MINOR: Upgrade ducktape to 0.7.5 (apache#6197) MINOR: Improve IntegrationTestUtils documentation (apache#5664) MINOR: upgrade to jdk8 8u202 KAFKA-7693; Fix SequenceNumber overflow in producer (apache#5989) KAFKA-7692; Fix ProducerStateManager SequenceNumber overflow (apache#5990) MINOR: update copyright year in the NOTICE file. (apache#6196) KAFKA-7793: Improve the Trogdor command line. (apache#6133)
Reviewers: Guozhang Wang <guozhang@confluent.io>, Ismael Juma <ismael@confluent.io>, Jorge Quilcate Otoya <quilcate.jorge@gmail.com>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
close(long, TimeUnit) was deprecated and replaced with close(Duration);