MINOR: Add timer for update limit offsets#8047
Conversation
| /** | ||
| * Update offset limit of a given changelog partition | ||
| */ | ||
| void updateLimitOffsets(); |
There was a problem hiding this comment.
This function is only triggered internally and can be removed from interface.
|
|
||
| // only for standby tasks that use source topics as changelogs (for active it is null); | ||
| // if it is not on source topics it is also null | ||
| private Long restoreLimitOffset; |
There was a problem hiding this comment.
Following the suggestion from KAFKA-9113 PR, by @ableegoldman we consolidate the limit-offset with end-offset.
| this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); | ||
| this.pollTimeMs = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); | ||
| this.updateOffsetIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ? | ||
| DEFAULT_OFFSET_UPDATE_MS : config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); |
There was a problem hiding this comment.
If user set this value to infinity we should still have a non-inf value to take care of the manual commit.
| throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e); | ||
| } | ||
|
|
||
| lastUpdateOffsetTime = time.milliseconds(); |
There was a problem hiding this comment.
If there's nothing to be updated or timed out, we do not update the timer.
There was a problem hiding this comment.
This would be less mysterious if this method were inlined into updateLimitOffsets. Right now, it's not terribly clear why it's ok to set the "last update offset time" in a method that doesn't update the offsets.
There was a problem hiding this comment.
This function is triggered by another caller besides updateLimitOffsets, plus it is a bit close to the NPathComplexity threshold..
| // in order to make sure we call the main consumer#poll in time. | ||
| // TODO: once both of these are moved to a separate thread this may no longer be a concern | ||
| this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); | ||
| this.pollTimeMs = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); |
There was a problem hiding this comment.
the type of pollTimeMs is Duration. It seems to me that the "ms" is a bit redundant.
|
retest this please |
…hangelog-reader-limit-offset-update
|
Thanks @guozhangwang ! This looks good to me. I left one relatively minor remark, I leave to you whether you want to take the suggestion or not. |
|
retest this please |
1 similar comment
|
retest this please |
Conflicts: * build.gradle: moved avro plugin definition below newly added test retry plugin. * apache-github/trunk: MINOR: further InternalTopologyBuilder cleanup (apache#8046) MINOR: Add timer for update limit offsets (apache#8047) HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051) KAFKA-9447: Add new customized EOS model example (apache#8031) KAFKA-8164: Add support for retrying failed (apache#8019) HOTFIX: checkstyle for newly added unit test KAFKA-9261; Client should handle unavailable leader metadata (apache#7770) MINOR: Fix typos introduced in KIP-559 (apache#8042) MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679) KAFKA-9113: Clean up task management and state management (apache#7997) MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038) KAFKA-9491; Increment high watermark after full log truncation (apache#8037) KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007) KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568) KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
…t-for-generated-requests * apache-github/trunk: (410 commits) KAFKA-8843: KIP-515: Zookeeper TLS support MINOR: Add missing quote for malformed line content (apache#8070) MINOR: Simplify KafkaProducerTest (apache#8044) KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057) KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056) KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048) HOTFIX: Fix two test failures in JDK11 (apache#8063) DOCS - clarify transactionalID and idempotent behavior (apache#7821) MINOR: further InternalTopologyBuilder cleanup (apache#8046) MINOR: Add timer for update limit offsets (apache#8047) HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051) KAFKA-9447: Add new customized EOS model example (apache#8031) KAFKA-8164: Add support for retrying failed (apache#8019) HOTFIX: checkstyle for newly added unit test KAFKA-9261; Client should handle unavailable leader metadata (apache#7770) MINOR: Fix typos introduced in KIP-559 (apache#8042) MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679) KAFKA-9113: Clean up task management and state management (apache#7997) MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038) KAFKA-9491; Increment high watermark after full log truncation (apache#8037) ...
Instead of always try to update committed offset limits as long as there are buffered records for standby tasks, we leverage on the commit interval to reduce our
consumer.committedfrequency.Committer Checklist (excluded from commit message)