Skip to content

KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets#7223

Merged
guozhangwang merged 4 commits intoapache:trunkfrom
cpettitt-confluent:kafka-8816
Aug 26, 2019
Merged

KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets#7223
guozhangwang merged 4 commits intoapache:trunkfrom
cpettitt-confluent:kafka-8816

Conversation

@cpettitt-confluent
Copy link
Copy Markdown
Contributor

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

This should have the "streams" label, but I don't seem to have enough permissions to set it.

@mjsax mjsax added the streams label Aug 20, 2019
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree in general, that RecordCollectorImpl#offsets is conceptually a private field, I am not sure if I understand why there is a bug. RecordCollectorImpl never reads offsets but only blindly updates the map on write in the send() callback. Hence, this PR does not seem toy change the end-of-end behavior? Can you elaborate?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be simpler to hand out a deep copy of the map directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be totally fine for the current usage pattern, where we have a single query that ends up copying the map anyway. However, if we ever ended up with other queries then those queries would pay the cost of the copy whether they need it or not. It would be a bit surprising that a copy is happening without looking at the implementation. My bias would be towards defensive coding without surprise performance impact.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem to be unused?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, copy-paste error. This can be removed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use assertThrows instead of try-fail-catch construct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's been a while since I've coded Java and I forgot about assertThrows.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use assertThat instead

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do for both instances.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as below.

Copy link
Copy Markdown
Contributor Author

@cpettitt-confluent cpettitt-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up with a patch to address your comments in test. If you feel strongly about copying the map in offsets let me know and I will make that change; otherwise I'll leave that as is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, copy-paste error. This can be removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do for both instances.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's been a while since I've coded Java and I forgot about assertThrows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be totally fine for the current usage pattern, where we have a single query that ends up copying the map anyway. However, if we ever ended up with other queries then those queries would pay the cost of the copy whether they need it or not. It would be a bit surprising that a copy is happening without looking at the implementation. My bias would be towards defensive coding without surprise performance impact.

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

Re. why is this a bug, based on my understanding of the code:

Per the doc on RecordCollector the contract is that the value returned from offsets is the latest acks from the producer. Prior to this change it was possible to get the offsets map and change it directly, with no involvement of a producer. That seems to be a violation of the documentation and my understanding of the intent of the class. Anybody can get the offsets map, do some manipulation to it for its own purpose and accidentally change the internal state of RecordCollector in a non-obvious way. In fact, that is what is happening from StreamTask during commit prior to this patch.

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

BTW, thanks for the review @mjsax and for reminding me about assertThrows. I'll get a new patch up tomorrow.

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

New commit is up and ready for review. I couldn't tell if you prefer force push or squash at merge and I see both in other PRs. I opted for squash at merge, but if you prefer force push let me know and I will make it so :).

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

cpettitt-confluent commented Aug 20, 2019

W.r.t observable behavior change, I reran with this patch and without and here is a difference in how checkpointing works:

Good:

[2019-08-19 15:59:34,197] INFO task [0_0] *** Writing checkpoint: {input-topic-0=15} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:354)
[2019-08-19 15:59:34,332] INFO task [0_0] *** Writing checkpoint: {input-topic-0=35} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:354)
[2019-08-19 15:59:34,443] INFO task [0_0] *** Writing checkpoint: {input-topic-0=53} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:354)

Bad:

[2019-08-20 09:28:39,009] INFO task [0_0] *** Writing checkpoint: {input-topic-0=15} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:354)
[2019-08-20 09:28:39,153] INFO task [0_0] *** Writing checkpoint: {input-topic-0=15} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:354)
[2019-08-20 09:28:39,264] INFO task [0_0] *** Writing checkpoint: {input-topic-0=15} (org.apache.kafka.streams.processor.internals.ProcessorStateManager:355)

In the bad case the consumed offset is not checkpointed because we added the first value we saw to the record collector and never update it (putIfAbsent):

    @Override
    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
        final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
            checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
        }

        return checkpointableOffsets;
    }

I have no idea if that manifests in bad behavior, but it doesn't look right and doesn't match the behavior in the non-optimized graph where the checkpoints increase for the changelog.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Aug 21, 2019

Thanks for the details.

I have no idea if that manifests in bad behavior, but it doesn't look right and doesn't match the behavior in the non-optimized graph where the checkpoints increase for the changelog.

It does not sound like a correctness issue, but a performance issue. If the checkpoint does not advance, on restore KS would re-read/re-play more data than necessary from the changelog.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Aug 21, 2019

MIght be worth to back-port this fix to older versions... Maybe back to 2.0? (or even 1.0?)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Aug 21, 2019

Java 11 / 2.12 and 2.13 passed.
Java8: kafka.api.SaslOAuthBearerSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

Retest this please

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.
@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

Seems like flaky tests. The first test passed this time and we picked up two connect test failures which were not there previously.

retest this please

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cpettitt-confluent, Thank you for the PR.

Here my feedback.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This last check is not needed, since it verifies functionality of the Map returned by Collections.unmodifiableMap() and not of the code under test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove ().

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Aug 26, 2019

FYI: Openend PR #7253 for a minor refactoring to activeTaskCheckpointableOffsets(). The PR should not conflict with this one. Reviews welcome.

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Aug 26, 2019

Retest this, please

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

Thanks for the feedback @cadonna! Good call on checking the map a second time. I will requery the offset directly from the collector, which more correctly completes verification. Patch coming shortly.

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

Ugh. Somehow my origin moved so this is pulling in a lot of irrelevant stuff. Let me see if I can clean it up.

@cpettitt-confluent
Copy link
Copy Markdown
Contributor Author

All better.

Retest this please.


assertThat(offsets.get(topicPartition), equalTo(2L));
assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition(topic, 0), 50L));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would remove this line

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue for keeping this because the change impacts the external behavior of the class. We're making a strong statement here: you will get an exception if you try to modify the contents of the returned map, you must copy this map if you want to make changes. This also distinguishes from the alternative approach we could have used in which we proactively copy the map for the user and where the user could have made a change to the map while still not impacting the underlying map. Given that this is externally facing and there is doc a couple levels up, I will fix that up.

Happy to discuss further if you strongly disagree.

assertThat(offsets.get(topicPartition), equalTo(2L));
assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition(topic, 0), 50L));

// Verify that collector offsets were not updated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would remove this comment because the code it comments is clear enough.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, this was probably for me more than anyone to highlight the subtle difference between this and the previous query. I'm fine pulling it out and since I have a doc change coming, I will make it so :).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Sep 4, 2019

@guozhangwang Should we cherry-pick this to older branches? I would at least cherry-pick to 2.3, but maybe even back to 2.0. Thoughts?

guozhangwang pushed a commit that referenced this pull request Sep 9, 2019
#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
guozhangwang pushed a commit that referenced this pull request Sep 9, 2019
#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
guozhangwang pushed a commit that referenced this pull request Sep 9, 2019
#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
guozhangwang pushed a commit that referenced this pull request Sep 9, 2019
#7223)

Make offsets immutable to users of RecordCollector.offsets. Fix up an
existing case where offsets could be modified in this way. Add a simple
test to verify offsets cannot be changed externally.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked all the way to 2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants