From e90a818b6ee4fc9d5ecc84834e8f94bc4b7cd9ab Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 17 Mar 2017 12:12:05 +0000 Subject: [PATCH 1/3] blah --- .../internals/RocksDBWindowStoreTest.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 7352673c3de78..e0bd38cd73518 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; @@ -39,6 +40,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -622,11 +624,11 @@ public void shouldFetchAndIterateOverExactKeys() throws Exception { final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>( "window", - 60 * 1000L * 2, 3, + 0x7a00000000000000L, 2, true, Serdes.String(), Serdes.String(), - windowSize, + 0x7a00000000000000L, true, Collections.emptyMap(), false); @@ -638,12 +640,22 @@ public void shouldFetchAndIterateOverExactKeys() throws Exception { windowStore.put("aa", "0002", 0); windowStore.put("a", "0003", 1); windowStore.put("aa", "0004", 1); - windowStore.put("a", "0005", 60000); + windowStore.put("a", "0005", 0x7a00000000000000L - 1); final List expected = Utils.mkList("0001", "0003", "0005"); + System.out.println("retention is " + retentionPeriod); assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); } + @Test + public void should() throws Exception { + final Bytes bytes1 = WindowStoreUtils.toBinaryKey(new byte[]{'a'}, 0x7a00000000000000L -1, 0); + final Bytes bytes2 = WindowStoreUtils.toBinaryKey(new byte[]{'a', 'a'}, 1, 0); + System.out.println(bytes2.compareTo(bytes1)); + System.out.println(bytes1); + System.out.println(bytes2); + + } private void putFirstBatch(final WindowStore store, final long startTime, final MockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero"); From 6992be597c641aecc850a00b2696922225ea6956 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 20 Mar 2017 16:29:18 +0000 Subject: [PATCH 2/3] fix window iterators --- .../state/internals/SessionKeySchema.java | 12 +-- .../state/internals/WindowKeySchema.java | 12 +-- .../internals/RocksDBSessionStoreTest.java | 39 ++++++++-- .../internals/RocksDBWindowStoreTest.java | 45 +++++++++-- .../state/internals/SessionKeySchemaTest.java | 74 +++++++++++++++++++ 5 files changed, 159 insertions(+), 23 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index cd2a4f6409297..edf34a016c411 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -50,15 +50,17 @@ public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, return new HasNextCondition() { @Override public boolean hasNext(final KeyValueIterator iterator) { - if (iterator.hasNext()) { + while (iterator.hasNext()) { final Bytes bytes = iterator.peekNextKey(); final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get())); - if (!keyBytes.equals(binaryKey)) { - return false; - } final long start = SessionKeySerde.extractStart(bytes.get()); final long end = SessionKeySerde.extractEnd(bytes.get()); - return end >= from && start <= to; + if (keyBytes.equals(binaryKey) + && end >= from + && start <= to) { + return true; + } + iterator.next(); } return false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java index 0a89da77cded8..76faf0eb3d6d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java @@ -46,14 +46,16 @@ public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, return new HasNextCondition() { @Override public boolean hasNext(final KeyValueIterator iterator) { - if (iterator.hasNext()) { + while (iterator.hasNext()) { final Bytes bytes = iterator.peekNextKey(); final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get()); - if (!keyBytes.equals(binaryKey)) { - return false; - } final long time = WindowStoreUtils.timestampFromBinaryKey(bytes.get()); - return time >= from && time <= to; + if (keyBytes.equals(binaryKey) + && time >= from + && time <= to) { + return true; + } + iterator.next(); } return false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 9082da0306225..9be7c10218c35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -35,6 +35,8 @@ import java.util.Arrays; import java.util.List; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -42,6 +44,7 @@ public class RocksDBSessionStoreTest { private SessionStore sessionStore; + private MockProcessorContext context; @Before public void before() { @@ -52,11 +55,11 @@ public void before() { Serdes.String(), Serdes.Long()); - final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), - Serdes.String(), - Serdes.Long(), - new NoOpRecordCollector(), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); + context = new MockProcessorContext(TestUtils.tempDirectory(), + Serdes.String(), + Serdes.Long(), + new NoOpRecordCollector(), + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); sessionStore.init(context, sessionStore); } @@ -144,6 +147,32 @@ public void shouldFindSessionsToMerge() throws Exception { assertFalse(results.hasNext()); } + @Test + public void shouldFetchExactKeys() throws Exception { + final RocksDBSegmentedBytesStore bytesStore = + new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 2, new SessionKeySchema()); + + sessionStore = new RocksDBSessionStore<>(bytesStore, + Serdes.String(), + Serdes.Long()); + + sessionStore.init(context, sessionStore); + + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L); + sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L); + + final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE); + final List results = new ArrayList<>(); + while (iterator.hasNext()) { + results.add(iterator.next().value); + } + + assertThat(results, equalTo(Arrays.asList(1L, 3L, 5L))); + } + static List, Long>> toList(final KeyValueIterator, Long> iterator) { final List, Long>> results = new ArrayList<>(); while (iterator.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index e0bd38cd73518..012c4ce5cc1ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -40,7 +40,6 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -642,20 +641,50 @@ public void shouldFetchAndIterateOverExactKeys() throws Exception { windowStore.put("aa", "0004", 1); windowStore.put("a", "0005", 0x7a00000000000000L - 1); + final List expected = Utils.mkList("0001", "0003", "0005"); - System.out.println("retention is " + retentionPeriod); assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected)); } + @SuppressWarnings("unchecked") @Test - public void should() throws Exception { - final Bytes bytes1 = WindowStoreUtils.toBinaryKey(new byte[]{'a'}, 0x7a00000000000000L -1, 0); - final Bytes bytes2 = WindowStoreUtils.toBinaryKey(new byte[]{'a', 'a'}, 1, 0); - System.out.println(bytes2.compareTo(bytes1)); - System.out.println(bytes1); - System.out.println(bytes2); + public void shouldFetchAndIterateOverExactBinaryKeys() throws Exception { + final RocksDBWindowStoreSupplier supplier = + new RocksDBWindowStoreSupplier<>( + "window", + 60000, 2, + true, + Serdes.Bytes(), + Serdes.String(), + 60000, + true, + Collections.emptyMap(), + false); + + windowStore = supplier.get(); + windowStore.init(context, windowStore); + final Bytes key1 = Bytes.wrap(new byte[]{0}); + final Bytes key2 = Bytes.wrap(new byte[]{0, 0}); + final Bytes key3 = Bytes.wrap(new byte[]{0, 0, 0}); + windowStore.put(key1, "1", 0); + windowStore.put(key2, "2", 0); + windowStore.put(key3, "3", 0); + windowStore.put(key1, "4", 1); + windowStore.put(key2, "5", 1); + windowStore.put(key3, "6", 59999); + windowStore.put(key1, "7", 59999); + windowStore.put(key2, "8", 59999); + windowStore.put(key3, "9", 59999); + + final List expectedKey1 = Utils.mkList("1", "4", "7"); + assertThat(toList(windowStore.fetch(key1, 0, Long.MAX_VALUE)), equalTo(expectedKey1)); + final List expectedKey2 = Utils.mkList("2", "5", "8"); + assertThat(toList(windowStore.fetch(key2, 0, Long.MAX_VALUE)), equalTo(expectedKey2)); + final List expectedKey3 = Utils.mkList("3", "6", "9"); + assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3)); } + private void putFirstBatch(final WindowStore store, final long startTime, final MockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java new file mode 100644 index 0000000000000..7c085ddffcdca --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.SessionKeySerde; +import org.apache.kafka.streams.kstream.internals.SessionWindow; +import org.apache.kafka.test.KeyValueIteratorStub; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +public class SessionKeySchemaTest { + + private final SessionKeySchema sessionKeySchema = new SessionKeySchema(); + private DelegatingPeekingKeyValueIterator iterator; + + @Before + public void before() { + final List> keys = Arrays.asList(KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(0, 0))), 1), + KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(0, 0))), 2), + KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(0, 0))), 3), + KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0}), new SessionWindow(10, 20))), 4), + KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0}), new SessionWindow(10, 20))), 5), + KeyValue.pair(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0, 0, 0}), new SessionWindow(10, 20))), 6)); + iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator())); + } + + @Test + public void shouldFetchExactKeysSkippingLongerKeys() throws Exception { + final List result = getValues(sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0}), 0, Long.MAX_VALUE)); + assertThat(result, equalTo(Arrays.asList(2, 4))); + } + + @Test + public void shouldFetchExactKeySkippingShorterKeys() throws Exception { + final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0, 0}), 0, Long.MAX_VALUE); + final List results = getValues(hasNextCondition); + assertThat(results, equalTo(Arrays.asList(1, 5))); + } + + + private List getValues(final HasNextCondition hasNextCondition) { + final List results = new ArrayList<>(); + while (hasNextCondition.hasNext(iterator)) { + results.add(iterator.next().value); + } + return results; + } + +} \ No newline at end of file From 8a9f1c1ff7b64e426a37997d584c6a860cf0791d Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 21 Mar 2017 09:29:21 +0000 Subject: [PATCH 3/3] address comments --- .../streams/kstream/internals/SessionKeySerde.java | 2 +- .../streams/state/internals/SessionKeySchema.java | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java index 7eb8300d6b2a0..7a85c7716c389 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java @@ -120,7 +120,7 @@ public static long extractStart(final byte[] binaryKey) { return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE); } - public static Window extractWindow(final byte[] binaryKey) { + static Window extractWindow(final byte[] binaryKey) { final ByteBuffer buffer = ByteBuffer.wrap(binaryKey); final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE); final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java index edf34a016c411..7d6761c15e999 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java @@ -52,12 +52,10 @@ public HasNextCondition hasNextCondition(final Bytes binaryKey, final long from, public boolean hasNext(final KeyValueIterator iterator) { while (iterator.hasNext()) { final Bytes bytes = iterator.peekNextKey(); - final Bytes keyBytes = Bytes.wrap(SessionKeySerde.extractKeyBytes(bytes.get())); - final long start = SessionKeySerde.extractStart(bytes.get()); - final long end = SessionKeySerde.extractEnd(bytes.get()); - if (keyBytes.equals(binaryKey) - && end >= from - && start <= to) { + final Windowed windowedKey = SessionKeySerde.fromBytes(bytes); + if (windowedKey.key().equals(binaryKey) + && windowedKey.window().end() >= from + && windowedKey.window().start() <= to) { return true; } iterator.next();