Skip to content

KAFKA-5172: Fix fetchPrevious to find the correct session.#2972

Closed
ghost wants to merge 4 commits intoapache:trunkfrom
kyle-winkelman:CachingSessionStore-fetchPrevious
Closed

KAFKA-5172: Fix fetchPrevious to find the correct session.#2972
ghost wants to merge 4 commits intoapache:trunkfrom
kyle-winkelman:CachingSessionStore-fetchPrevious

Conversation

@ghost
Copy link
Copy Markdown

@ghost ghost commented May 4, 2017

Change fetchPrevious to use findSessions with the proper key and timestamps rather than using fetch.

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3463/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3457/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3454/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

@dguy could you take a look?

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KyleWinkelman thanks for the PR. Can you please add a test to CachingSessionStoreTest that fails without this change?

@ghost
Copy link
Copy Markdown
Author

ghost commented May 4, 2017

@dguy I have added a test case that fails without this change.

@ghost ghost changed the title [KAFKA-5172] Fix fetchPrevious to find the correct session. KAFKA-5172: Fix fetchPrevious to find the correct session. May 4, 2017
@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3477/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3474/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3483/
Test FAILed (JDK 8 and Scala 2.11).

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test @KyleWinkelman left a couple of comments

*/
package org.apache.kafka.streams.state.internals;

import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you re-order the input back to what they where before?

@Test
public void shouldPutAndMaybeForward() throws Exception {
final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0, 0));
final Windowed<String> a2 = new Windowed<>("a", new SessionWindow(0, 1));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to add the extra class just for this case. We could do something like:

final List<Change<Long>> values = new ArrayList<>();
        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>() {
            @Override
            public void apply(final Windowed<String> key, final Long newValue, final Long oldValue) {
                values.add(new Change<>(newValue, oldValue));
            }
        });
       ...
      assertThat(values, equalTo(Arrays.asList(new Change<>(1L, null), new Change<>(null, 1L), new Change<>(2L, null))));

assertFalse(results.hasNext());
}

@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe something like: shouldForwardChangedValuesDuringFlush ?

import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.kstream.internals.Change;

public class MockCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above, i don't think we need this class

@ghost
Copy link
Copy Markdown
Author

ghost commented May 4, 2017

I made the recommended changes and altered the contents of the test a little so we could see a case where the new and old values are set.
Thanks for the comments.

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3495/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3486/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3489/
Test FAILed (JDK 7 and Scala 2.10).

@ghost
Copy link
Copy Markdown
Author

ghost commented May 4, 2017

@dguy Do you think we should prevent the Bytes.wrap(serdes.rawKey(key.key())) from being called twice (once in putAndMaybeForward and once in fetchPrevious)? I don't know how costly this is but it could save a little bit of processing. I think we could call it once and change the fetchPrevious to accept Bytes and Window.

    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
        final Bytes binaryKey = entry.key();
        final RecordContext current = context.recordContext();
        context.setRecordContext(entry.recordContext());
        try {
            final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(), topic);
            **final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));**
            if (flushListener != null) {
                final AGG newValue = serdes.valueFrom(entry.newValue());
                final AGG oldValue = fetchPrevious(rawKey, key.window());
                if (!(newValue == null && oldValue == null)) {
                    flushListener.apply(key, newValue, oldValue);
                }
            }
            bytesStore.put(new Windowed<>(**rawKey**, key.window()), entry.newValue());
        } finally {
            context.setRecordContext(current);
        }
    }
    private AGG fetchPrevious(**final Bytes rawKey, final Window window**) {
        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
                .findSessions(**rawKey**, window.start(), window.end())) {
            if (!iterator.hasNext()) {
                return null;
            }
            return serdes.valueFrom(iterator.next().value);
        }
    }

@dguy
Copy link
Copy Markdown
Contributor

dguy commented May 5, 2017

@KyleWinkelman yes that makes sense. Thanks!

@ghost
Copy link
Copy Markdown
Author

ghost commented May 5, 2017

Ok made the changes. Anything else required of me for this PR?

@asfbot
Copy link
Copy Markdown

asfbot commented May 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3537/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @KyleWinkelman LGTM

@asfbot
Copy link
Copy Markdown

asfbot commented May 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3546/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented May 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3540/
Test FAILed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3583/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3577/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3573/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented May 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3620/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented May 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3614/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented May 7, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3611/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Merged to trunk.

@asfgit asfgit closed this in e472ee7 May 8, 2017
@ghost ghost deleted the CachingSessionStore-fetchPrevious branch May 15, 2017 13:19
guozhangwang added a commit that referenced this pull request Jan 31, 2019
…for flushing / getter (#6161)

#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.

I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…for flushing / getter (apache#6161)

apache#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.

I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants