Skip to content

fix SequenceMetadata deserialization#7256

Merged
gianm merged 5 commits intoapache:masterfrom
clintropolis:fix-sequence-metadata
Mar 13, 2019
Merged

fix SequenceMetadata deserialization#7256
gianm merged 5 commits intoapache:masterfrom
clintropolis:fix-sequence-metadata

Conversation

@clintropolis
Copy link
Copy Markdown
Member

Fixes #7252.

I don't love this fix, am totally open to other ideas. This PR adds an abstract method to SeekableStreamIndexTaskRunner

abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();

so that sub classes can create the correct TypeReference to allow deserialization of SequenceMetadata during task restore to function. This also means SequenceMetadata has been pulled out of SeekableStreamIndexTaskRunner and given the same generic parameters of PartitionIdType and SequenceOffsetType. This was sort of ugly because SequenceMetadata was calling methods on it's parent SeekableStreamIndexTaskRunner, so those methods now take a runner as an argument.

Also fixed is an issue where a resumed task that was at the end offset would not correctly end the task, resulting in what was afaict a task stuck in it's read loop forever.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Mar 13, 2019

I don't love this fix, am totally open to other ideas.

What don't you love about it?

This also means SequenceMetadata has been pulled out of SeekableStreamIndexTaskRunner and given the same generic parameters of PartitionIdType and SequenceOffsetType. This was sort of ugly because SequenceMetadata was calling methods on it's parent SeekableStreamIndexTaskRunner, so those methods now take a runner as an argument.

I haven't read the patch yet (will soon), but, I don't necessarily think that this kind of change is ugly! The new structure might even be better. The SequenceMetadata in master looks at first like a simple state class, but it actually has methods that modify the runner it came from, which isn't intuitive to me. Making it a non-inner class and explicitly passing in the runner could make that clearer.

Also fixed is an issue where a resumed task that was at the end offset would not correctly end the task, resulting in what was afaict a task stuck in it's read loop forever.

Is this related to #7252 or a separate thing you fixed opportunistically?

if (isEndSequenceOffsetsExclusive() &&
createSequenceNumber(record.getSequenceNumber()).compareTo(
createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) {
stillReading = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the change to fix the bug of stuck on resume. But, it looks to be better to fix the assignPartitions method which is the method to check how many offsets are remaining per partition. This method is also called before starting the read loop.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks, will update 👍

if (isEndSequenceOffsetsExclusive() &&
createSequenceNumber(record.getSequenceNumber()).compareTo(
createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) {
stillReading = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stillReading should only be set to false when our assignment is empty (no partitions left to read). Hitting the end offset for one partition doesn't mean we should stop reading all partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried removing this line and the tests all still passed. Did you have a test that failed without this line being here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes I have a test case to replicate this, I ended up modifying the test to not hit the condition anymore before I determined the issue, will add the test back. This was an attempt to fix opportunistically after choosing a test case that happened to hit the issue.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to split this out into a separate PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@clintropolis
Copy link
Copy Markdown
Member Author

Thanks for review @gianm and @jihoonson, will open a follow up PR with the fix for restoring a task that only needs to publish and is already at it's end offset.

What don't you love about it?

I don't especially like the abstract method that returns a TypeReference to tell it how to serialize the things, but I guess it's ok compared to the alternatives I can think of like custom json deserializer or creating ParameterizedType with reflection. I do agree that it's nice that pulling out SequenceMetadata does make it a bit more clear the coupling between it and SeekableStreamIndexTaskRunner, I think there is probably some room for refactoring. I can't help but wonder if maybe SeekableStreamIndexTaskRunner should not have been abstract, and rather all of it's abstract methods usage instead be delegating to stream specific types that it encapsulates to do all the things like handle offset comparisons, serde, control points etc, but I haven't quite nailed down what this should look like.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Mar 13, 2019

I don't especially like the abstract method that returns a TypeReference to tell it how to serialize the things, but I guess it's ok compared to the alternatives I can think of like custom json deserializer or creating ParameterizedType with reflection. I do agree that it's nice that pulling out SequenceMetadata does make it a bit more clear the coupling between it and SeekableStreamIndexTaskRunner, I think there is probably some room for refactoring. I can't help but wonder if maybe SeekableStreamIndexTaskRunner should not have been abstract, and rather all of it's abstract methods usage instead be delegating to stream specific types that it encapsulates to do all the things like handle offset comparisons, serde, control points etc, but I haven't quite nailed down what this should look like.

Ah. I think the TypeReference based thing you did is pretty reasonable compared to the alternatives. TaskAction does something similar. So that makes me feel better :)

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gianm gianm merged commit fb1489d into apache:master Mar 13, 2019
@clintropolis clintropolis added this to the 0.14.0 milestone Mar 13, 2019
@clintropolis clintropolis deleted the fix-sequence-metadata branch March 13, 2019 22:13
clintropolis added a commit to clintropolis/druid that referenced this pull request Mar 14, 2019
* wip

* fix tests, stop reading if we are at end offset

* fix build

* remove restore at end offsets fix in favor of a separate PR

* use typereference from method for serialization too
fjy pushed a commit that referenced this pull request Mar 14, 2019
* wip

* fix tests, stop reading if we are at end offset

* fix build

* remove restore at end offsets fix in favor of a separate PR

* use typereference from method for serialization too
gianm pushed a commit to implydata/druid-public that referenced this pull request Mar 14, 2019
* wip

* fix tests, stop reading if we are at end offset

* fix build

* remove restore at end offsets fix in favor of a separate PR

* use typereference from method for serialization too
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Tasks unable restore persisted segments properly

3 participants