diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 25414f30f1616..c68a49ab3459a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -291,7 +291,6 @@ - diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java index 33c82cb4a546a..c4522c796286e 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -33,7 +33,7 @@ public interface SinkTaskContext { * and the configuration is using variable references such as those compatible with * {@link org.apache.kafka.common.config.ConfigTransformer}. */ - public Map configs(); + Map configs(); /** * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java index 2e87986648f50..ddb0a78718351 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java @@ -32,7 +32,7 @@ public interface SourceTaskContext { * and the configuration is using variable references such as those compatible with * {@link org.apache.kafka.common.config.ConfigTransformer}. */ - public Map configs(); + Map configs(); /** * Get the OffsetStorageReader for this SourceTask. diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ad4bd6e4bb373..f469727ce8e4c 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2447,7 +2447,7 @@ class Log(@volatile private var _dir: File, // replace old segment with new ones info(s"Replacing overflowed segment $segment with split segments $newSegments") - replaceSegments(newSegments.toList, List(segment), isRecoveredSwapFile = false) + replaceSegments(newSegments.toList, List(segment)) newSegments.toList } catch { case e: Exception => diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 2e979ef0188c0..7eb4dc8595130 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -101,7 +101,6 @@ public byte[] serialize() { final byte[][] headerKeysBytes; final byte[][] headerValuesBytes; - int size = 0; size += Long.BYTES; // value.context.timestamp size += Long.BYTES; // value.context.offset diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 066d499e5ffc9..c9864506877cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -150,6 +150,7 @@ public String metricsScope() { * @param maxCacheSize maximum number of items in the LRU (cannot be negative) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build * an LRU Map based store + * @throws IllegalArgumentException if {@code maxCacheSize} is negative */ public static KeyValueBytesStoreSupplier lruMap(final String name, final int maxCacheSize) { Objects.requireNonNull(name, "name cannot be null"); @@ -231,6 +232,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, @@ -257,6 +259,7 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, final Duration retentionPeriod, @@ -328,6 +331,7 @@ private static WindowBytesStoreSupplier persistentWindowStore(final String name, * caching and means that null values will be ignored. * @return an instance of {@link WindowBytesStoreSupplier} * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize} */ public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, final Duration retentionPeriod, @@ -481,7 +485,7 @@ public static StoreBuilder> windowStoreBuilder(final Wi *

* The provided supplier should not be a supplier for * {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the - * windows-store. On read, no valid timestamp but a dummy timestamp will be returned. + * window-store. On read, no valid timestamp but a dummy timestamp will be returned. * * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null}) * @param keySerde the key serde to use diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java index 2071ca786292b..7888316bacf7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java @@ -33,7 +33,8 @@ public KeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index 3c386f3f7db66..f61ebd47a95e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -28,10 +28,11 @@ import org.apache.kafka.streams.state.WindowStore; /** - * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its + * A Metered {@link TimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its * inner WindowStore implementation do not need to provide its own metrics collecting functionality. * The inner {@link WindowStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s * to convert from <K,ValueAndTimestamp<V>> to <Bytes,byte[]> + * * @param * @param */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java index 51ef319da94e6..d0d039420673c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -33,7 +33,8 @@ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier, final Serde keySerde, final Serde valueSerde, final Time time) { - super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time); + super(Objects.requireNonNull(storeSupplier, "storeSupplier cannot be null").name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index be8f259366841..cea10887e18ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -46,7 +46,8 @@ public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier storeSup keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index a54426216c24a..290b0ff171600 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -46,7 +46,8 @@ public TimestampedWindowStoreBuilder(final WindowBytesStoreSupplier storeSupplie final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), time); - Objects.requireNonNull(storeSupplier, "bytesStoreSupplier can't be null"); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index 722564572093a..5876f78c64e97 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; + public class WindowStoreBuilder extends AbstractStoreBuilder> { private final Logger log = LoggerFactory.getLogger(WindowStoreBuilder.class); @@ -34,6 +36,8 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); + Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); + Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's metricsScope can't be null"); this.storeSupplier = storeSupplier; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index d7e19c7e9e89a..43f4d6d8e02fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -190,7 +190,7 @@ void runTestWithDriver(final TestRecord expectedFinalResult, final } final TestRecord updatedExpectedFinalResult = - new TestRecord( + new TestRecord<>( expectedFinalResult.key(), expectedFinalResult.value(), null, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 9d7c23d73c631..800ea9652d53b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -441,7 +441,6 @@ public void shouldAllowDisablingChangelog() { final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - final String changeLog = "suppressionintegrationtest-shouldAllowDisablingChangelog-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog"; cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index c5c06c0de440a..c3528272e04b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -816,7 +816,7 @@ private static void verify(final List> results, for (final TestRecord result : results) { final KeyValueTimestamp expected = expectedIterator.next(); try { - assertThat(result, equalTo(new TestRecord(expected.key(), expected.value(), null, expected.timestamp()))); + assertThat(result, equalTo(new TestRecord<>(expected.key(), expected.value(), null, expected.timestamp()))); } catch (final AssertionError e) { throw new AssertionError(printRecords(results) + " != " + expectedResults, e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java index f963786a3c3c0..77f4ab8f0df4c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TimestampedKeyValueStoreMaterializerTest.java @@ -111,6 +111,7 @@ public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { final InMemoryKeyValueStore store = new InMemoryKeyValueStore("name"); EasyMock.expect(supplier.name()).andReturn("name").anyTimes(); EasyMock.expect(supplier.get()).andReturn(store); + EasyMock.expect(supplier.metricsScope()).andReturn("metricScope"); EasyMock.replay(supplier); final MaterializedInternal> materialized = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index fd427e3439fc3..851bb66ee170c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -90,6 +91,14 @@ public void before() { false), Serdes.String(), Serdes.String()).build()); + stores.put( + "s-store", + Stores.sessionStoreBuilder( + Stores.inMemorySessionStore( + "s-store", + Duration.ofMillis(10L)), + Serdes.String(), + Serdes.String()).build()); final ProcessorContextImpl mockContext = niceMock(ProcessorContextImpl.class); expect(mockContext.applicationId()).andStubReturn("appId"); @@ -175,6 +184,26 @@ public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() { } } + @Test + public void shouldReturnWindowStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List> stores = + provider.stores("w-store", QueryableStoreTypes.windowStore()); + assertEquals(1, stores.size()); + for (final ReadOnlyWindowStore store : stores) { + assertThat(store, instanceOf(ReadOnlyWindowStore.class)); + assertThat(store, not(instanceOf(TimestampedWindowStore.class))); + } + } + + @Test + public void shouldNotReturnWindowStoreAsTimestampedStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List>> stores = + provider.stores("w-store", QueryableStoreTypes.timestampedWindowStore()); + assertEquals(0, stores.size()); + } + @Test public void shouldReturnTimestampedWindowStoreAsWindowStore() { final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); @@ -186,4 +215,15 @@ public void shouldReturnTimestampedWindowStoreAsWindowStore() { assertThat(store, not(instanceOf(TimestampedWindowStore.class))); } } + + @Test + public void shouldReturnSessionStore() { + final GlobalStateStoreProvider provider = new GlobalStateStoreProvider(stores); + final List> stores = + provider.stores("s-store", QueryableStoreTypes.sessionStore()); + assertEquals(1, stores.size()); + for (final ReadOnlySessionStore store : stores) { + assertThat(store, instanceOf(ReadOnlySessionStore.class)); + } + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index 1f4384a8c708d..cb6d848dd2e02 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -34,8 +34,13 @@ import java.util.Collections; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class KeyValueStoreBuilderTest { @@ -50,6 +55,7 @@ public class KeyValueStoreBuilderTest { public void setUp() { EasyMock.expect(supplier.get()).andReturn(inner); EasyMock.expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); EasyMock.replay(supplier); builder = new KeyValueStoreBuilder<>( supplier, @@ -134,9 +140,16 @@ public void shouldThrowNullPointerIfTimeIsNull() { new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBStore("name", null)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final Exception e = assertThrows(NullPointerException.class, + () -> new KeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 3e3c1a7f9a496..352b0cbe7779a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -51,10 +51,10 @@ public class SessionStoreBuilderTest { private SessionStoreBuilder builder; @Before - public void setUp() throws Exception { - + public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); replay(supplier); builder = new SessionStoreBuilder<>( @@ -120,7 +120,7 @@ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { @Test public void shouldThrowNullPointerIfInnerIsNull() { final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("supplier cannot be null")); + assertThat(e.getMessage(), equalTo("storeSupplier cannot be null")); } @Test @@ -146,8 +146,21 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); - assertThat(e.getMessage(), equalTo("name cannot be null")); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBSessionStore( + new RocksDBSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new SessionKeySchema()) + )); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final Exception e = assertThrows(NullPointerException.class, + () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index db827e8730628..0ac170a86f8de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.TimestampedKeyValueStore; @@ -125,6 +126,14 @@ public void before() { Serdes.String(), Serdes.String()), "the-processor"); + topology.addStateStore( + Stores.sessionStoreBuilder( + Stores.inMemorySessionStore( + "session-store", + Duration.ofMillis(10L)), + Serdes.String(), + Serdes.String()), + "the-processor"); final Properties properties = new Properties(); final String applicationId = "applicationId"; @@ -258,6 +267,17 @@ public void shouldFindTimestampedWindowStoresAsWindowStore() { } } + @Test + public void shouldFindSessionStores() { + mockThread(true); + final List> sessionStores = + provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore())); + assertEquals(2, sessionStores.size()); + for (final ReadOnlySessionStore store: sessionStores) { + assertThat(store, instanceOf(ReadOnlySessionStore.class)); + } + } + @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() { mockThread(true); @@ -286,6 +306,13 @@ public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() { provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore())); } + @Test(expected = InvalidStateStoreException.class) + public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() { + mockThread(true); + taskOne.getStore("session-store").close(); + provider.stores(StoreQueryParameters.fromNameAndType("session-store", QueryableStoreTypes.sessionStore())); + } + @Test public void shouldReturnEmptyListIfNoStoresFoundWithName() { mockThread(true); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index c3ad6b329b444..b79d67e030924 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -36,6 +36,7 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThrows; @@ -52,6 +53,7 @@ public class TimestampedKeyValueStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); expect(inner.persistent()).andReturn(true).anyTimes(); replay(supplier, inner); @@ -168,7 +170,14 @@ public void shouldThrowNullPointerIfTimeIsNull() { @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - assertThrows(NullPointerException.class, () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name", null)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedKeyValueStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java index 539df95d73f20..586ec73ea66f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -39,6 +39,7 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -56,6 +57,7 @@ public class TimestampedWindowStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); expect(inner.persistent()).andReturn(true).anyTimes(); replay(supplier, inner); @@ -200,4 +202,23 @@ public void shouldThrowNullPointerIfTimeIsNull() { assertThrows(NullPointerException.class, () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null)); } + @Test + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore( + new RocksDBTimestampedSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + final Exception e = assertThrows(NullPointerException.class, + () -> new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index ed43c4a9dfbdb..e8dc286a0b192 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -38,9 +38,12 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class WindowStoreBuilderTest { @@ -55,6 +58,7 @@ public class WindowStoreBuilderTest { public void setUp() { expect(supplier.get()).andReturn(inner); expect(supplier.name()).andReturn("name"); + expect(supplier.metricsScope()).andReturn("metricScope"); replay(supplier); builder = new WindowStoreBuilder<>( @@ -154,4 +158,23 @@ public void shouldThrowNullPointerIfTimeIsNull() { new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); } + @Test + public void shouldThrowNullPointerIfMetricsScopeIsNull() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBWindowStore( + new RocksDBSegmentedBytesStore( + "name", + null, + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final Exception e = assertThrows(NullPointerException.class, + () -> new WindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("storeSupplier's metricsScope can't be null")); + } } \ No newline at end of file diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 69d2ddae7637c..a307e595a870a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -671,7 +671,7 @@ public TaskState call() throws Exception { /** * Initiate shutdown, but do not wait for it to complete. */ - public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException { + public void beginShutdown(boolean stopAgents) { if (shutdown.compareAndSet(false, true)) { executor.submit(new Shutdown(stopAgents)); } @@ -680,7 +680,7 @@ public void beginShutdown(boolean stopAgents) throws ExecutionException, Interru /** * Wait for shutdown to complete. May be called prior to beginShutdown. */ - public void waitForShutdown() throws ExecutionException, InterruptedException { + public void waitForShutdown() throws InterruptedException { while (!executor.awaitTermination(1, TimeUnit.DAYS)) { } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 4b262eff8b54a..42127fdb34635 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -79,7 +79,7 @@ public ProduceBenchWorker(String id, ProduceBenchSpec spec) { @Override public void start(Platform platform, WorkerStatusTracker status, - KafkaFutureImpl doneFuture) throws Exception { + KafkaFutureImpl doneFuture) { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("ProducerBenchWorker is already running."); }