Skip to content

KAFKA-6970: All standard state stores guarded with read only wrapper#6016

Merged
mjsax merged 7 commits intoapache:trunkfrom
nizhikov:KAFKA-6970
Dec 11, 2018
Merged

KAFKA-6970: All standard state stores guarded with read only wrapper#6016
mjsax merged 7 commits intoapache:trunkfrom
nizhikov:KAFKA-6970

Conversation

@nizhikov
Copy link
Copy Markdown
Contributor

@nizhikov nizhikov commented Dec 8, 2018

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Dec 8, 2018
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 8, 2018

Tests fail with

java.lang.UnsupportedOperationException: Global store is read only

Seems this PR breaks something.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 8, 2018

Also note: only init() and close() should be guarded. Regular read/write access must be preserved.

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Dec 9, 2018

Hey @nizhikov ,

Thanks for picking this up!

After reading the ticket, I think the current change doesn't quite do what we want...

James (the reporter) mentioned this method being used for getting a state store for use in Transformer and Processor, both of which need read-write access to the store. It sounds like the idea is to protect users from calling methods that are intended only to be called from the framework.

This is similar to the KAFKA-7420, as you mentioned in the ticket, but local state stores may safely be read/write, where as global state stores must be read-only.

Plus, James specifically called out protecting init() and close(), neither of which are protected by the current decorators.

I'd propose:

  1. throw UnsupportedOperationException in StateStoreReadOnlyDecorator methods init and close
  2. change the ERROR_MESSAGE to "This method may only be called by Kafka Steams"
  3. add new XXReadWriteDecorator classes for the three kinds of stores
  4. use the ReadOnly decorators for global stores and ReadWrite decorators for local stores

How does that sound?

(edit) I didn't refresh, so I didn't see @mjsax 's prior comments... sorry about the overlap.

@nizhikov
Copy link
Copy Markdown
Contributor Author

nizhikov commented Dec 9, 2018

Hello, @mjsax @vvcephei . Thanks for a quick review and detailed explanation of the ticket.
I will rework PR in a few hours.

* Use existing `AbstractStateStore implements WrappedStateStore`.
* Fixing `TupleForwarder` to work with several `WrappedStateStore` that wraps each other.
@nizhikov
Copy link
Copy Markdown
Contributor Author

nizhikov commented Dec 9, 2018

I reworked PR as follows:

  1. Usage of existing AbstractStateStore implements WrappedStateStore.

Kafka, already have an abstract class for a StateStore implementations that wraps some other state store.
Also, we have marker interface for such stores WrappedStateStore.
If wrapping state store doesn't implement WrappedStateStore it leads to wrong TupleForwarder behaviour(TupleForwarder#cachedStateStore(StateStore) method).
Wrong TupleForward behaviour leads to org.apache.kafka.streams.scala.WordCountTest fails.

  1. Fixed TupleForwarder to work with several WrappedStateStore that wraps each other.

Right now TupleForwarder#cachedStateStore(StateStore) works correctly only if we have one layer of wrapped state store(and my patch introduce second). I fix this method to work with any count of wrapping state stores.

  1. I also change StateStoreReadOnlyDecorator to extends AbstractStateStore(and implements WrappedStateStore).

If we don't extend AbstractStateStore we still have green tests(see #5865), but it seems that we must implements WrappedStateStore for all decorators.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Couple of comments/nits.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I had one nitpick...

Thanks for this,
-John

}

@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Dec 10, 2018

The java 11 test failure is unrelated:

kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToEarliestOnOneTopicAndPartition

Error Message
java.lang.AssertionError: expected:<Map(bar-0 -> 50, bar-1 -> 0)> but was:<Map(bar-0 -> 50, bar-1 -> 50)>
Stacktrace
java.lang.AssertionError: expected:<Map(bar-0 -> 50, bar-1 -> 0)> but was:<Map(bar-0 -> 50, bar-1 -> 50)>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:118)
	at org.junit.Assert.assertEquals(Assert.java:144)
	at kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsetsCommitted(ResetConsumerGroupOffsetTest.scala:388)
	at kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToEarliestOnOneTopicAndPartition(ResetConsumerGroupOffsetTest.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/1162/

@vvcephei
Copy link
Copy Markdown
Contributor

Retest this, please.

@nizhikov
Copy link
Copy Markdown
Contributor Author

@mjsax I fixed all your comments. Tests are green. Please, review.

@mjsax mjsax merged commit c142809 into apache:trunk Dec 11, 2018
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 11, 2018

Merged to trunk.

Thanks for the PR @nizhikov!

@nizhikov
Copy link
Copy Markdown
Contributor Author

@mjsax @vvcephei

Guys, thanks for the review and ticket description. I Appreciate it.

@vvcephei
Copy link
Copy Markdown
Contributor

No problem, @nizhikov ; thanks for the contribution!

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…pache#6016)

Reviewer: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants