Kafka with topicPattern can ignore old offsets spuriously#16190
Kafka with topicPattern can ignore old offsets spuriously#16190zachjsh merged 18 commits intoapache:masterfrom
Conversation
| } | ||
|
|
||
| @Override | ||
| protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage() |
There was a problem hiding this comment.
Instead of overriding this method, you should just override the checkSourceMetadataMatch method.
There was a problem hiding this comment.
or rename this method appropriately so that callers know that its also filtering the spurious stored offsets
There was a problem hiding this comment.
Doesn't this fail when updateDataSourceMetadataWithHandle is called later on since that too will match the committed metadata with the new metadata?
There was a problem hiding this comment.
Updated logic for the KafkaDataSourceMetadata so that it handles topicPattern during the matching check in updateDataSourceMetadataWithHandle, thanks for pointing this out.
| : getIoConfig().getStream().equals(matchValue); | ||
|
|
||
| if (!match && !topicMisMatchLogged.contains(matchValue)) { | ||
| log.warn( |
There was a problem hiding this comment.
This could happen when going from multi-topic to single-topic? Will these bad offsets get cleared automatically?
There was a problem hiding this comment.
going from multi-topic to single topic, if the multi-topic sequence numbers contained any offsets for streams that do not match the single topic name, the new metadata state will have these sequence offsets removed. However this causes that matches method in the kafka metadata to return false, which will ultimately lead to failure to publish segments, just as going from single topic -> another single topic when there were sequence offsets stored for the first single topic would. I think this is the behavior we want. Users should be explicit when sequence offsets are lost due to config change, and should be forced to reset the respective counters needed, imo. Let me know what you think
There was a problem hiding this comment.
Yeah, having the user reset the offsets explicitly in this scenario when there's no match at all makes sense to me.
| ? pattern.matcher(matchValue).matches() | ||
| : getIoConfig().getStream().equals(matchValue); | ||
|
|
||
| if (!match && !topicMisMatchLogged.contains(matchValue)) { |
There was a problem hiding this comment.
The function seems a bit complex to follow, and there's a risk of introducing bugs due to the multiple if...else conditionals intertwined with multi/single topic logic. Could we maybe add a single block each for the multi topic and single topic logic or make a function each?
There was a problem hiding this comment.
simplified, let me know if ok now.
| { | ||
| return spec.getTuningConfig(); | ||
| } | ||
|
|
There was a problem hiding this comment.
A brief javadoc for this function would be useful and would clarify the need for this override from the base implementation and the filtering behavior
| if (this.getClass() != other.getClass()) { | ||
| throw new IAE( | ||
| "Expected instance of %s, got %s", | ||
| this.getClass().getName(), | ||
| other.getClass().getName() | ||
| ); | ||
| } |
There was a problem hiding this comment.
nit: adding a function validateSequenceNumbersBaseType() in the base class should remove this code duplication in 4-5 places
| import java.util.regex.Pattern; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| @JsonTypeName(SeekableStreamEndSequenceNumbers.TYPE) |
There was a problem hiding this comment.
Same here. How is this jackson type information used?
There was a problem hiding this comment.
added information about serialization to javadoc, let me know if ok
| : getIoConfig().getStream().equals(matchValue); | ||
|
|
||
| if (!match && !topicMisMatchLogged.contains(matchValue)) { | ||
| log.warn( |
There was a problem hiding this comment.
Yeah, having the user reset the offsets explicitly in this scenario when there's no match at all makes sense to me.
AmatyaAvadhanula
left a comment
There was a problem hiding this comment.
Overall logic and tests LGTM!
| this.spec = spec; | ||
| this.emitter = spec.getEmitter(); | ||
| this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig(); | ||
| this.pattern = getIoConfig().isMultiTopic() ? Pattern.compile(getIoConfig().getStream()) : null; |
Check failure
Code scanning / CodeQL
Regular expression injection
Fixes #16189
Description
This fixes an issue in which updating a kafka streaming supervisors topic from single to multi-topic (pattern), or vice versa, could cause old offsets to be ignored spuriously. The details of the bug and how it manifests is in the linked issue. To fix the issue, this change properly handles using previously found partition offsets, in the case where the old offsets were recorded and stored by a streaming supervisor with or without multi topic enabled, and the new streaming supervisor has multi-topic enabled or disabled.
This PR has: