Skip to content

KAFKA-7245 (Deprecate WindowStore#put(key, value)) :- The method in t…#7105

Merged
mjsax merged 15 commits intoapache:trunkfrom
omanges:trunk
Oct 7, 2019
Merged

KAFKA-7245 (Deprecate WindowStore#put(key, value)) :- The method in t…#7105
mjsax merged 15 commits intoapache:trunkfrom
omanges:trunk

Conversation

@omanges
Copy link
Copy Markdown
Contributor

@omanges omanges commented Jul 22, 2019

WindowStore#put(key, value) has no timestamp as a parameter, so it causes inconsistency to identify to which window does the key belong.

The stream module of Kafka has a window state store that stores the aggregated values for a key in a given time frame. The window store is implemented as an interface, this interface has a strange method named put(key, value), this method has does not have a timestamp as a parameter which is important to determine that to which window frame does the key belongs. In this method, the current record timestamp is used for determining the window frame(as specified in the description of the method), this constraint makes WindowStore error-prone. It is also specified in the method description that method with a timestamp parameter should be used which already present in the interface which expects key, value, and start timestamp as well of the window to which the key belongs. Therefore by deprecating (and finally removing) the method put(key, value), we can prevent inconsistency.

  • As the method is deprecated, the test classes which used them are needed to be changed when later the method is removed. Following is the list of the classes:
  1. WindowStoreFacadeTest.java
  2. WindowBytesStoreTest.java
  3. SimpleBenchmark.java (benchmarking)
  4. RocksDBWindowStoreTest.java
  5. ProcessorContextImplTest.java
  6. MeteredWindowStoreTest.java
  7. InMemoryWindowStoreTest.java
  8. ChangeLoggingWindowBytesStoreTest.java
  9. CachingWindowStoreTest.java
  10. ChangeLoggingTimestampedWindowBytesStoreTest.java

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 23, 2019

Thanks for the PR @omanges -- seems the code does not compile... Please update your PR to make it compile so we can review it.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 23, 2019

Copied from the Jira ticket:

Hi Matthias J. Sax, as the method in the WindowStore interface and other interfaces and classes which implement WindowStore has been annotated with @deprecated, the test cases which use this method are failing in the build. So do the test cases are also needed to be updated?

The build does not run any tests but fails because it does not compile (note, that compiler warning will fail the build):

Task :streams:compileJava
11:33:40 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:332: warning: [dep-ann] deprecated item is not annotated with @deprecated
11:33:40 public void put(final K key,

...

Do you refer to a local run of the tests? If yes, the question is why exactly did the tests fail?

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Jul 23, 2019

warning: [deprecation] put(K,V) in WindowStore has been deprecated

Similar types of warnings are generated. Due to this, the local build is also not able to compile. So does this means we also need to change test cases as well which are using the method which we have tagged as deprecated?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 23, 2019

Yes, you need to add @Deprecated (for case if a test implements the interface) or @SuppressWarning("deprecation") (for case the old deprecated API is used).

omanges added 2 commits July 24, 2019 11:48
…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Jul 24, 2019

I am able to build successfully on local.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jul 24, 2019

Checkstyle failed. Try to execute ./gradlew streams:clean streams:checkstyleMain streams:checkstyleTest -- if you run streams:test checkstyle goals are included automatically.

omanges added 2 commits July 25, 2019 10:53
…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @omanges! Some comments.

* to identify the window frame to which the key belongs.
* Use WindowStore#put(key, value, timestamp) instead.
*
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need JavaDocs here as we inherit JavaDocs by default automatically. (Similar for all other classes that extend the top level interface -- it's sufficient to have JavaDocs at the top level interface.)

* if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException if the given key is {@code null}
*
* @deprecated as timestamp is not provided for the (key, value) pair, this causes inconsistency
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for the key-value pair

*
* @deprecated as timestamp is not provided for the (key, value) pair, this causes inconsistency
* to identify the window frame to which the key belongs.
* Use WindowStore#put(key, value, timestamp) instead.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use {@link #put(Object, Object, long)} instead.

runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}

@SuppressWarnings("deprecation")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for SimpleBenchmark we should update the code instead of suppressing the warning.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the method call store.put(key, value), can be replaced by store.put(key, value, timestamp) as we are able to retrieve timestamp from the context. @mjsax what do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code in the latest commit please have a look.

}

@Test
@SuppressWarnings("deprecation")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This this and the other tests in this class: I am not sure what the best approach is, and if we should rather update the code -- or maybe even duplicate the code?

Similar for some other test classes.

\cc @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think we should migrate all these tests to use the non-deprecated put then add a test or two just verifying put(key) and put(key, time) have the same results. But IMO all the tests which aren't explicitly testing the deprecated method should be using the "correct" one

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree with @ableegoldman : for test coverage of functionality, we should touch on non-deprecated code only; and only having a few deprecated test that tests the deprecated methods acts the same behavior as their delegated method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That sounds reasonable to me!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mjsax the timestamp is not present in the local context of the test cases. So in order to replace the deprecated method with the new method, we need some timestamp. So can we use the 'DEFAULT_TIMESTAMP' which defined as a global in the class CachingWindowStoreTest.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For shouldNotReturnDuplicatesInRanges it seems best to use processorContext.timestamp() -- processorContext is passed in init() do you just need to add a member variable to the Transformer to store is so you can use it in transform()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mjsax I have made changes for the shouldNotReturnDuplicatesInRanges. For the remaining usage i.e for the cachingStore do we need to update to make use of the method with timestamp?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mjsax it seems that an exception is thrown if a call is made to access the timestamp in the init.
Exception:-
org.apache.kafka.streams.state.internals.CachingWindowStoreTest > shouldNotReturnDuplicatesInRanges FAILED
org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-TRANSFORM-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:96)
at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:526)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:253)
at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:377)
at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:240)
at org.apache.kafka.streams.state.internals.CachingWindowStoreTest.shouldNotReturnDuplicatesInRanges(CachingWindowStoreTest.java:180)

    Caused by:
    java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:158)
        at org.apache.kafka.streams.state.internals.CachingWindowStoreTest$1.init(CachingWindowStoreTest.java:134)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.init(TransformerSupplierAdapter.java:42)
        at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.init(KStreamFlatTransform.java:51)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:92)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is expected. You need to add a member variable ProcessorContext context to the class and assign this.context = context in init().

In transform() you can use context.timestamp() to get the current timestamp.

});
}


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not: avoid unnecessary diffs

omanges added 5 commits July 29, 2019 10:42
…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Aug 16, 2019

Is Jenkins server down? I am not able to see the cause of the build fail.

@ableegoldman
Copy link
Copy Markdown
Member

Build results get removed after a few days. Probably a flaky test, but try running locally just in case.

Retest this, please

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Aug 22, 2019

Can someone help me I am not able to understand the reason for the failure of the test cases?

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Aug 22, 2019

How can I trigger tests again?

@ableegoldman
Copy link
Copy Markdown
Member

ableegoldman commented Aug 22, 2019

You can retrigger them by commenting:

retest this, please

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Aug 23, 2019

Hi please retest this.

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Aug 23, 2019

The test is not failing because of the changes I am unable to understand, what’s the problem.

@ableegoldman
Copy link
Copy Markdown
Member

Yeah, unfortunately there are a number of flaky tests across kafka projects. They can fail for environmental reasons not related to your PR. If you see a failure you believe is unrelated to your changes, please submit a JIRA for the flaky test (or comment on an existing one to note there was an additional failure). Generally we just comment on the PR saying which builds passed and which test failed on the ones that didn't, then retrigger the tests.
However, I notice one of the failing tests may be due to your changes. Can you look into org.apache.kafka.streams.state.internals.CachingWindowStoreTest.shouldNotReturnDuplicatesInRanges ?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Sep 4, 2019

@omanges -- I would recommend to run the test locally first and make sure they pass. If they pass locally and there are test failures on Jenkins, it might be due to test flakiness.

Did you run the tests locally? Also to trigger a Jenkins retest, you need to use the exact phrase retest this please (without comma as Sophie did above; also Hi please retest this. won't work)

…he WindowStore#put(key, value) has no timestamp as parameter, so it causes inconsistency to identify to which window does the key belong.
@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Sep 13, 2019

@mjsax I think the first one failed due to some flaky test.

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Sep 17, 2019

So are there any further changes needed for this PR?

@omanges omanges requested a review from mjsax September 18, 2019 09:50
@omanges omanges requested a review from guozhangwang October 5, 2019 09:41
@mjsax mjsax added the streams label Oct 7, 2019
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @omanges! LGTM

@mjsax mjsax merged commit cfa1067 into apache:trunk Oct 7, 2019
mjsax pushed a commit that referenced this pull request Oct 7, 2019
Implements KIP-474.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Oct 7, 2019

Merged to trunk and cherry-picked to 2.4 branch.

Thanks for the KIP and PR @omanges!

@omanges
Copy link
Copy Markdown
Contributor Author

omanges commented Oct 22, 2019

Thanks.

ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk:
  KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464)
  KAFKA-8944: Fixed KTable compiler warning. (apache#7393)
  KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)
  MINOR: remove unused imports in Streams system tests (apache#7468)
  KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388)
  KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449)
  MINOR: Modified Exception handling for KIP-470 (apache#7461)
  KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105)
  KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386)
  KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453)
  MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants