From eaa62936e8d1e47d34e010c8b0ff3299fa41297b Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 14 Mar 2018 13:58:54 -0700 Subject: [PATCH 1/4] MINOR: fix KafkaStreams#cleanUp() to should throw and fail for lock conflict --- .../processor/internals/StateDirectory.java | 52 +++- .../kafka/streams/KafkaStreamsTest.java | 266 +++++++++++++++++- 2 files changed, 304 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index f5c4c31d0839a..95cdbfd40170c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -125,11 +125,16 @@ private String logPrefix() { /** * Get the lock for the {@link TaskId}s directory if it is available - * @param taskId + * @param taskId the id of the task to get a lock on * @return true if successful - * @throws IOException + * @throws IOException if lock could not be aquired */ synchronized boolean lock(final TaskId taskId) throws IOException { + return lock(taskId, false); + } + + private synchronized boolean lock(final TaskId taskId, + final boolean throwOverlappingFileLockException) throws IOException { if (!createStateDirectory) { return true; } @@ -164,7 +169,12 @@ synchronized boolean lock(final TaskId taskId) throws IOException { return false; } - final FileLock lock = tryLock(channel); + final FileLock lock; + if (throwOverlappingFileLockException) { + lock = channel.tryLock(); + } else { + lock = tryLock(channel); + } if (lock != null) { locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock)); @@ -174,10 +184,13 @@ synchronized boolean lock(final TaskId taskId) throws IOException { } synchronized boolean lockGlobalState() throws IOException { + return lockGlobalState(false); + } + + private synchronized boolean lockGlobalState(final boolean throwOverlappingFileLockException) throws IOException { if (!createStateDirectory) { return true; } - if (globalStateLock != null) { log.trace("{} Found cached state dir lock for the global task", logPrefix()); return true; @@ -193,7 +206,12 @@ synchronized boolean lockGlobalState() throws IOException { // file, in this case we will return immediately indicating locking failed. return false; } - final FileLock fileLock = tryLock(channel); + final FileLock fileLock; + if (throwOverlappingFileLockException) { + fileLock = channel.tryLock(); + } else { + fileLock = tryLock(channel); + } if (fileLock == null) { channel.close(); return false; @@ -235,6 +253,7 @@ synchronized void unlock(final TaskId taskId) throws IOException { } } + @SuppressWarnings("ThrowFromFinallyBlock") public synchronized void clean() { try { cleanRemovedTasks(0, true); @@ -242,13 +261,24 @@ public synchronized void clean() { // this is already logged within cleanRemovedTasks throw new StreamsException(e); } + try { - if (stateDir.exists()) { + if (stateDir.exists() && lockGlobalState(true)) { Utils.delete(globalStateDir().getAbsoluteFile()); } + } catch (final OverlappingFileLockException e) { + log.error("{} Failed to get the global state directory lock.", logPrefix(), e); + throw new StreamsException(e); } catch (final IOException e) { log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e); throw new StreamsException(e); + } finally { + try { + unlockGlobalState(); + } catch (final IOException e) { + log.error("{} Failed to release global state directory lock.", logPrefix()); + throw new StreamsException(e); + } } } @@ -267,6 +297,7 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { } } + @SuppressWarnings("ThrowFromFinallyBlock") private synchronized void cleanRemovedTasks(final long cleanupDelayMs, final boolean manualUserCall) throws Exception { final File[] taskDirs = listTaskDirectories(); @@ -279,7 +310,7 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs, final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { try { - if (lock(id)) { + if (lock(id, manualUserCall)) { final long now = time.milliseconds(); final long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) { @@ -302,11 +333,8 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs, } } } catch (final OverlappingFileLockException e) { - // locked by another thread - if (manualUserCall) { - log.error("{} Failed to get the state directory lock.", logPrefix(), e); - throw e; - } + log.error("{} Failed to get the state directory lock.", logPrefix(), e); + throw e; } catch (final IOException e) { log.error("{} Failed to delete the state directory.", logPrefix(), e); if (manualUserCall) { diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 2cd31ba736297..b394f0ea18bc9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -62,6 +63,11 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Collection; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -390,7 +396,6 @@ public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() { } catch (final StreamsException expected) { // This is a result of not being able to connect to the broker. } - // There's nothing to assert... We're testing that this operation actually completes. } @Test @@ -540,7 +545,7 @@ public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() { } @Test - public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception { + public void shouldReturnFalseOnCloseWhenThreadsHaveNotTerminated() throws Exception { final AtomicBoolean keepRunning = new AtomicBoolean(true); KafkaStreams streams = null; try { @@ -613,6 +618,232 @@ public void shouldAllowCleanupBeforeStartAndAfterClose() { globalStreams.cleanUp(); } + @Test + public void shouldAllowCleanupIfApplicationIsDown() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + new TaskId(0, 0)); + final File testFile = new File(taskDir, "test-file"); + + assertTrue(taskDir.mkdirs()); + assertTrue(testFile.createNewFile()); + + streams.cleanUp(); + assertFalse(testFile.exists()); + assertFalse(taskDir.exists()); + + streams.start(); + streams.close(); + + assertTrue(taskDir.mkdirs()); + assertTrue(testFile.createNewFile()); + + streams.cleanUp(); + assertFalse(testFile.exists()); + assertFalse(taskDir.exists()); + } + + @Test + public void shouldAllowCleanupIfLockIsReleased() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + new TaskId(0, 0)); + final File testFile = new File(taskDir, "test-file"); + assertTrue(taskDir.mkdirs()); + assertTrue(testFile.createNewFile()); + + final File lockFile = new File(taskDir, ".lock"); + final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + + final FileLock lock; + try { + lock = channel.tryLock(); + } catch (final Exception e) { + if (channel != null) { + channel.close(); + } + throw e; + } + try { + lock.release(); + } finally { + channel.close(); + } + + assertTrue(lockFile.exists()); + + streams.cleanUp(); + + assertFalse(testFile.exists()); + assertFalse(taskDir.exists()); + } + + @Test + public void shouldAllowCleanupIfLockIsReleasedFromDifferentThread() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + new TaskId(0, 0)); + final File testFile = new File(taskDir, "test-file"); + assertTrue(taskDir.mkdirs()); + assertTrue(testFile.createNewFile()); + + final File lockFile = new File(taskDir, ".lock"); + + final LockThread lockThread = new LockThread(lockFile); + lockThread.start(); + + while (!lockThread.lockAcquired) { + Utils.sleep(100); + } + + lockThread.releaseLock = true; + + while (lockThread.lockAcquired) { + Utils.sleep(100); + } + assertNull(lockThread.error); + + assertTrue(lockFile.exists()); + + streams.cleanUp(); + + assertFalse(testFile.exists()); + assertFalse(taskDir.exists()); + + lockThread.isRunning = false; + lockThread.join(); + } + + @Test + public void shouldFailWithOverlappingFileLockException() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + new TaskId(0, 0)); + assertTrue(taskDir.mkdirs()); + + final File lockFile = new File(taskDir, ".lock"); + + try (final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.tryLock(); + try { + streams.cleanUp(); + fail("Should have throw StreamsException"); + } catch (final StreamsException expected) { + assertTrue(expected.getCause() instanceof OverlappingFileLockException); + } + } + } + + @Test + public void shouldFailWithOverlappingFileLockExceptionWhenLockedByDifferentThread() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + new TaskId(0, 0)); + assertTrue(taskDir.mkdirs()); + + final File lockFile = new File(taskDir, ".lock"); + + final LockThread lockThread = new LockThread(lockFile); + lockThread.start(); + + while (!lockThread.lockAcquired) { + Utils.sleep(100); + } + + try { + try { + streams.cleanUp(); + fail("Should have throw StreamsException"); + } catch (final StreamsException expected) { + assertTrue(expected.getCause() instanceof OverlappingFileLockException); + } + } finally { + lockThread.releaseLock = true; + lockThread.isRunning = false; + lockThread.join(); + } + assertNull(lockThread.error); + } + + @Test + public void shouldFailWithOverlappingFileLockExceptionForGlobalTask() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + "global"); + assertTrue(taskDir.mkdirs()); + + final File lockFile = new File(taskDir, ".lock"); + + try (final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.tryLock(); + try { + streams.cleanUp(); + fail("Should have throw StreamsException"); + } catch (final StreamsException expected) { + assertTrue(expected.getCause() instanceof OverlappingFileLockException); + } + } + } + + @Test + public void shouldFailWithOverlappingFileLockExceptionForGlobalTaskWhenLockedByDifferentThread() throws Exception { + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + + final File taskDir = new File( + props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + + File.separator + "global"); + assertTrue(taskDir.mkdirs()); + + final File lockFile = new File(taskDir, ".lock"); + + final LockThread lockThread = new LockThread(lockFile); + lockThread.start(); + + while (!lockThread.lockAcquired) { + Utils.sleep(100); + } + + try { + try { + streams.cleanUp(); + fail("Should have throw StreamsException"); + } catch (final StreamsException expected) { + assertTrue(expected.getCause() instanceof OverlappingFileLockException); + } + } finally { + lockThread.releaseLock = true; + lockThread.isRunning = false; + lockThread.join(); + } + assertNull(lockThread.error); + } + @Test public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { globalStreams.start(); @@ -846,4 +1077,35 @@ public void onChange(final KafkaStreams.State newState, } } + private class LockThread extends Thread { + private final File lockFile; + volatile boolean isRunning = true; + volatile boolean releaseLock = false; + volatile boolean lockAcquired = false; + volatile Exception error = null; + + LockThread(final File lockFile) { + this.lockFile = lockFile; + } + + @Override + public void run() { + try { + try (final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.tryLock(); + lockAcquired = true; + while (!releaseLock) { + Utils.sleep(100); + } + } + lockAcquired = false; + } catch (final Exception e) { + error = e; + } + while (isRunning) { + Utils.sleep(100); + } + } + } + } From f27f524709609149be844ba60df89e7f2e796a1c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 21 Aug 2019 13:51:30 -0700 Subject: [PATCH 2/4] Rebased --- .../kafka/streams/KafkaStreamsTest.java | 328 ++++++++---------- .../kafka/streams/StreamsConfigTest.java | 44 ++- 2 files changed, 191 insertions(+), 181 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index b394f0ea18bc9..2fb583802b771 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,15 +16,10 @@ */ package org.apache.kafka.streams; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -35,6 +30,8 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; @@ -49,7 +46,6 @@ import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -59,15 +55,14 @@ import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Collection; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -80,10 +75,13 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -122,33 +120,10 @@ public void cleanup() { } } - @Test - public void testOsDefaultSocketBufferSizes() { - props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); - props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.close(); - } - - @Test(expected = KafkaException.class) - public void testInvalidSocketSendBufferSize() { - props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.close(); - } - - @Test(expected = KafkaException.class) - public void testInvalidSocketReceiveBufferSize() { - props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, -2); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); - streams.close(); - } - @Test public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() { globalStreams.close(); - - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + assertThat(globalStreams.state(), is(KafkaStreams.State.NOT_RUNNING)); } @Test @@ -156,15 +131,15 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); - Assert.assertEquals(0, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + assertThat(stateListener.numChanges, is(0)); + assertThat(globalStreams.state(), is(KafkaStreams.State.CREATED)); globalStreams.start(); TestUtils.waitForCondition( () -> stateListener.numChanges == 2, "Streams never started."); - Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + assertThat(globalStreams.state(), is(KafkaStreams.State.RUNNING)); for (final StreamThread thread: globalStreams.threads) { thread.stateListener().onChange( @@ -173,8 +148,8 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In StreamThread.State.RUNNING); } - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + assertThat(stateListener.numChanges, is(3)); + assertThat(globalStreams.state(), is(KafkaStreams.State.REBALANCING)); for (final StreamThread thread : globalStreams.threads) { thread.stateListener().onChange( @@ -183,8 +158,8 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In StreamThread.State.PARTITIONS_REVOKED); } - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + assertThat(stateListener.numChanges, is(3)); + assertThat(globalStreams.state(), is(KafkaStreams.State.REBALANCING)); globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( globalStreams.threads[NUM_THREADS - 1], @@ -196,8 +171,8 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + assertThat(stateListener.numChanges, is(3)); + assertThat(globalStreams.state(), is(KafkaStreams.State.REBALANCING)); for (final StreamThread thread : globalStreams.threads) { if (thread != globalStreams.threads[NUM_THREADS - 1]) { @@ -208,15 +183,15 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In } } - Assert.assertEquals(4, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + assertThat(stateListener.numChanges, is(4)); + assertThat(globalStreams.state(), is(KafkaStreams.State.RUNNING)); globalStreams.close(); TestUtils.waitForCondition( () -> stateListener.numChanges == 6, "Streams never closed."); - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + assertThat(globalStreams.state(), is(KafkaStreams.State.NOT_RUNNING)); } @Test @@ -224,15 +199,15 @@ public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedExcept final StateListenerStub stateListener = new StateListenerStub(); globalStreams.setStateListener(stateListener); - Assert.assertEquals(0, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.CREATED, globalStreams.state()); + assertThat(stateListener.numChanges, is(0)); + assertThat(globalStreams.state(), is(KafkaStreams.State.CREATED)); globalStreams.start(); TestUtils.waitForCondition( () -> stateListener.numChanges == 2, "Streams never started."); - Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); + assertThat(globalStreams.state(), is(KafkaStreams.State.RUNNING)); for (final StreamThread thread : globalStreams.threads) { thread.stateListener().onChange( @@ -241,8 +216,8 @@ public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedExcept StreamThread.State.RUNNING); } - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + assertThat(stateListener.numChanges, is(3)); + assertThat(globalStreams.state(), is(KafkaStreams.State.REBALANCING)); globalStreams.threads[NUM_THREADS - 1].stateListener().onChange( globalStreams.threads[NUM_THREADS - 1], @@ -254,8 +229,8 @@ public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedExcept StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN); - Assert.assertEquals(3, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.REBALANCING, globalStreams.state()); + assertThat(stateListener.numChanges, is(3)); + assertThat(globalStreams.state(), is(KafkaStreams.State.REBALANCING)); for (final StreamThread thread : globalStreams.threads) { if (thread != globalStreams.threads[NUM_THREADS - 1]) { @@ -271,8 +246,8 @@ public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedExcept } } - Assert.assertEquals(4, stateListener.numChanges); - Assert.assertEquals(KafkaStreams.State.ERROR, globalStreams.state()); + assertThat(stateListener.numChanges, is(4)); + assertThat(globalStreams.state(), is(KafkaStreams.State.ERROR)); globalStreams.close(); @@ -280,7 +255,7 @@ public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedExcept TestUtils.waitForCondition( () -> stateListener.numChanges == 6, "Streams never closed."); - Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); + assertThat(globalStreams.state(), is(KafkaStreams.State.NOT_RUNNING)); } @Test @@ -317,8 +292,8 @@ public void testStateThreadClose() throws Exception { threadsField.setAccessible(true); final StreamThread[] threads = (StreamThread[]) threadsField.get(streams); - assertEquals(NUM_THREADS, threads.length); - assertEquals(streams.state(), KafkaStreams.State.CREATED); + assertThat(threads.length, is(NUM_THREADS)); + assertThat(streams.state(), is(KafkaStreams.State.CREATED)); streams.start(); TestUtils.waitForCondition( @@ -369,12 +344,12 @@ public void testStateGlobalThreadClose() throws Exception { () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, "Thread never stopped."); globalStreamThread.join(); - assertEquals(streams.state(), KafkaStreams.State.ERROR); + assertThat(streams.state(), is(KafkaStreams.State.ERROR)); } finally { streams.close(); } - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + assertThat(streams.state(), is(KafkaStreams.State.NOT_RUNNING)); } @Test @@ -391,10 +366,7 @@ public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() { // make sure we have the global state thread running too builder.globalTable("anyTopic"); try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) { - streams.start(); - fail("expected start() to time out and throw an exception."); - } catch (final StreamsException expected) { - // This is a result of not being able to connect to the broker. + assertThrows(StreamsException.class, streams::start); } } @@ -423,12 +395,13 @@ public void testInitializesAndDestroysMetricsReporters() { try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) { final int newInitCount = MockMetricsReporter.INIT_COUNT.get(); final int initDiff = newInitCount - oldInitCount; - assertTrue("some reporters should be initialized by calling on construction", initDiff > 0); + assertThat("some reporters should be initialized by calling on construction", + initDiff, greaterThan(0)); streams.start(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); streams.close(); - assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get()); + assertThat(MockMetricsReporter.CLOSE_COUNT.get(), is(oldCloseCount + initDiff)); } } @@ -438,110 +411,59 @@ public void testCloseIsIdempotent() { final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); globalStreams.close(); - Assert.assertEquals("subsequent close() calls should do nothing", - closeCount, MockMetricsReporter.CLOSE_COUNT.get()); + assertThat("subsequent close() calls should do nothing", + MockMetricsReporter.CLOSE_COUNT.get(), is(closeCount)); } @Test public void testCannotStartOnceClosed() { globalStreams.start(); globalStreams.close(); - try { - globalStreams.start(); - fail("Should have throw IllegalStateException"); - } catch (final IllegalStateException expected) { - // this is ok - } finally { - globalStreams.close(); - } + assertThrows(IllegalStateException.class, globalStreams::start); } @Test public void testCannotStartTwice() { globalStreams.start(); - - try { - globalStreams.start(); - fail("Should throw an IllegalStateException"); - } catch (final IllegalStateException e) { - // this is ok - } finally { - globalStreams.close(); - } + assertThrows(IllegalStateException.class, globalStreams::start); } @Test public void shouldNotSetGlobalRestoreListenerAfterStarting() { globalStreams.start(); - try { - globalStreams.setGlobalStateRestoreListener(new MockStateRestoreListener()); - fail("Should throw an IllegalStateException"); - } catch (final IllegalStateException e) { - // expected - } finally { - globalStreams.close(); - } + assertThrows(IllegalStateException.class, () -> globalStreams.setGlobalStateRestoreListener(new MockStateRestoreListener())); } @Test public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() { globalStreams.start(); - try { - globalStreams.setUncaughtExceptionHandler(null); - fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - // expected - } + assertThrows(IllegalStateException.class, () -> globalStreams.setUncaughtExceptionHandler(null)); } @Test public void shouldThrowExceptionSettingStateListenerNotInCreateState() { globalStreams.start(); - try { - globalStreams.setStateListener(null); - fail("Should throw IllegalStateException"); - } catch (final IllegalStateException e) { - // expected - } + assertThrows(IllegalStateException.class, () -> globalStreams.setStateListener(null)); } @Test - public void testIllegalMetricsConfig() { - props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); - - try { - new KafkaStreams(builder.build(), props); - fail("Should have throw ConfigException"); - } catch (final ConfigException expected) { /* expected */ } - } - - @Test - public void testLegalMetricsConfig() { - props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); - new KafkaStreams(builder.build(), props).close(); - - props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString()); - new KafkaStreams(builder.build(), props).close(); - } - - @Test(expected = IllegalStateException.class) public void shouldNotGetAllTasksWhenNotRunning() { - globalStreams.allMetadata(); + assertThrows(IllegalStateException.class, globalStreams::allMetadata); } - @Test(expected = IllegalStateException.class) + @Test public void shouldNotGetAllTasksWithStoreWhenNotRunning() { - globalStreams.allMetadataForStore("store"); + assertThrows(IllegalStateException.class, () -> globalStreams.allMetadataForStore("store")); } - @Test(expected = IllegalStateException.class) + @Test public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() { - globalStreams.metadataForKey("store", "key", Serdes.String().serializer()); + assertThrows(IllegalStateException.class, () -> globalStreams.metadataForKey("store", "key", Serdes.String().serializer())); } - @Test(expected = IllegalStateException.class) + @Test public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() { - globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0); + assertThrows(IllegalStateException.class, () -> globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0)); } @Test @@ -591,19 +513,18 @@ public void shouldReturnFalseOnCloseWhenThreadsHaveNotTerminated() throws Except public void shouldReturnThreadMetadata() { globalStreams.start(); final Set threadMetadata = globalStreams.localThreadsMetadata(); - assertNotNull(threadMetadata); - assertEquals(2, threadMetadata.size()); + assertThat(threadMetadata.size(), is(2)); for (final ThreadMetadata metadata : threadMetadata) { assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState())); - assertEquals(0, metadata.standbyTasks().size()); - assertEquals(0, metadata.activeTasks().size()); + assertThat(metadata.standbyTasks().size(), is(0)); + assertThat(metadata.activeTasks().size(), is(0)); final String threadName = metadata.threadName(); assertTrue(threadName.startsWith("clientId-StreamThread-")); - assertEquals(threadName + "-consumer", metadata.consumerClientId()); - assertEquals(threadName + "-restore-consumer", metadata.restoreConsumerClientId()); - assertEquals(Collections.singleton(threadName + "-producer"), metadata.producerClientIds()); - assertEquals("clientId-admin", metadata.adminClientId()); + assertThat(metadata.consumerClientId(), is(threadName + "-consumer")); + assertThat(metadata.restoreConsumerClientId(), is(threadName + "-restore-consumer")); + assertThat(metadata.producerClientIds(), is(Collections.singleton(threadName + "-producer"))); + assertThat(metadata.adminClientId(), is("clientId-admin")); } } @@ -729,6 +650,24 @@ public void shouldAllowCleanupIfLockIsReleasedFromDifferentThread() throws Excep @Test public void shouldFailWithOverlappingFileLockException() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); + builder.addGlobalStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("storeName"), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "topicName", + Consumed.as("someName"), + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final File taskDir = new File( @@ -738,21 +677,37 @@ public void shouldFailWithOverlappingFileLockException() throws Exception { assertTrue(taskDir.mkdirs()); final File lockFile = new File(taskDir, ".lock"); - try (final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { channel.tryLock(); - try { - streams.cleanUp(); - fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - assertTrue(expected.getCause() instanceof OverlappingFileLockException); - } + + final StreamsException expected = assertThrows(StreamsException.class, streams::cleanUp); + assertThat(expected.getCause(), instanceOf(OverlappingFileLockException.class)); + } finally { + streams.close(); } } @Test public void shouldFailWithOverlappingFileLockExceptionWhenLockedByDifferentThread() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); + builder.addGlobalStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("storeName"), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "topicName", + Consumed.as("someName"), + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final File taskDir = new File( @@ -771,12 +726,8 @@ public void shouldFailWithOverlappingFileLockExceptionWhenLockedByDifferentThrea } try { - try { - streams.cleanUp(); - fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - assertTrue(expected.getCause() instanceof OverlappingFileLockException); - } + final StreamsException expected = assertThrows(StreamsException.class, streams::cleanUp); + assertThat(expected.getCause(), instanceOf(OverlappingFileLockException.class)); } finally { lockThread.releaseLock = true; lockThread.isRunning = false; @@ -788,6 +739,24 @@ public void shouldFailWithOverlappingFileLockExceptionWhenLockedByDifferentThrea @Test public void shouldFailWithOverlappingFileLockExceptionForGlobalTask() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); + builder.addGlobalStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("storeName"), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "topicName", + Consumed.as("someName"), + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final File taskDir = new File( @@ -800,18 +769,33 @@ public void shouldFailWithOverlappingFileLockExceptionForGlobalTask() throws Exc try (final FileChannel channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { channel.tryLock(); - try { - streams.cleanUp(); - fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - assertTrue(expected.getCause() instanceof OverlappingFileLockException); - } + + final StreamsException expected = assertThrows(StreamsException.class, streams::cleanUp); + assertThat(expected.getCause(), instanceOf(OverlappingFileLockException.class)); } } @Test public void shouldFailWithOverlappingFileLockExceptionForGlobalTaskWhenLockedByDifferentThread() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); + builder.addGlobalStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("storeName"), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "topicName", + Consumed.as("someName"), + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }); final KafkaStreams streams = new KafkaStreams(builder.build(), props); final File taskDir = new File( @@ -830,12 +814,8 @@ public void shouldFailWithOverlappingFileLockExceptionForGlobalTaskWhenLockedByD } try { - try { - streams.cleanUp(); - fail("Should have throw StreamsException"); - } catch (final StreamsException expected) { - assertTrue(expected.getCause() instanceof OverlappingFileLockException); - } + final StreamsException expected = assertThrows(StreamsException.class, streams::cleanUp); + assertThat(expected.getCause(), instanceOf(OverlappingFileLockException.class)); } finally { lockThread.releaseLock = true; lockThread.isRunning = false; @@ -851,12 +831,8 @@ public void shouldThrowOnCleanupWhileRunning() throws InterruptedException { () -> globalStreams.state() == KafkaStreams.State.RUNNING, "Streams never started."); - try { - globalStreams.cleanUp(); - fail("Should have thrown IllegalStateException"); - } catch (final IllegalStateException expected) { - assertEquals("Cannot clean up while running.", expected.getMessage()); - } + final IllegalStateException expected = assertThrows(IllegalStateException.class, globalStreams::cleanUp); + assertThat(expected.getMessage(), is("Cannot clean up while running.")); } @Test @@ -1037,13 +1013,13 @@ private void startStreamsAndCheckDirExists(final Topology topology, try { final List files = Files.find(basePath, 999, (p, bfa) -> !p.equals(basePath)).collect(Collectors.toList()); if (shouldFilesExist && files.isEmpty()) { - Assert.fail("Files should have existed, but it didn't: " + files); + fail("Files should have existed, but it didn't: " + files); } if (!shouldFilesExist && !files.isEmpty()) { - Assert.fail("Files should not have existed, but it did: " + files); + fail("Files should not have existed, but it did: " + files); } } catch (final IOException e) { - Assert.fail("Couldn't read the state directory : " + baseDir.getPath()); + fail("Couldn't read the state directory : " + baseDir.getPath()); } finally { streams.close(); streams.cleanUp(); @@ -1077,7 +1053,7 @@ public void onChange(final KafkaStreams.State newState, } } - private class LockThread extends Thread { + private static class LockThread extends Thread { private final File lockFile; volatile boolean isRunning = true; volatile boolean releaseLock = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 27e225597b2fc..398860b4b3c40 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -16,12 +16,15 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -50,10 +53,11 @@ import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig; -import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -77,16 +81,47 @@ public void setUp() { streamsConfig = new StreamsConfig(props); } - @Test(expected = ConfigException.class) + @Test public void shouldThrowExceptionIfApplicationIdIsNotSet() { props.remove(StreamsConfig.APPLICATION_ID_CONFIG); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void testOsDefaultSocketBufferSizes() { + props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + props.put(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); new StreamsConfig(props); } - @Test(expected = ConfigException.class) + @Test + public void shouldThrowForInvalidSocketSendBufferSize() { + props.put(CommonClientConfigs.SEND_BUFFER_CONFIG, -2); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void testIllegalMetricsConfig() { + props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void shouldAllowToSetInfoMetricsConfig() { + props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); + new StreamsConfig(props); + } + + @Test + public void shouldAllowToSetDebugMetricsConfig() { + props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString()); + new StreamsConfig(props); + } + + @Test public void shouldThrowExceptionIfBootstrapServersIsNotSet() { props.remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); - new StreamsConfig(props); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); } @Test @@ -572,7 +607,6 @@ public void shouldSpecifyCorrectKeySerdeClassOnError() { } } - @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectValueSerdeClassOnError() { final Properties props = getStreamsConfig(); From c00bf48f3b86cbc6cc46c700af94e208d0a0bf79 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 21 Aug 2019 15:27:31 -0700 Subject: [PATCH 3/4] Make sure we actually have a `StateDirectory` --- .../kafka/streams/KafkaStreamsTest.java | 134 ++++++------------ 1 file changed, 43 insertions(+), 91 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 2fb583802b771..fc139b5db0619 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -529,20 +529,20 @@ public void shouldReturnThreadMetadata() { } @Test - public void shouldAllowCleanupBeforeStartAndAfterClose() { + public void shouldAllowCleanupBeforeStartAndAfterClose() throws Exception { + final KafkaStreams streams = getStatefulApp(); try { - globalStreams.cleanUp(); - globalStreams.start(); + streams.cleanUp(); + streams.start(); } finally { - globalStreams.close(); + streams.close(); } - globalStreams.cleanUp(); + streams.cleanUp(); } @Test public void shouldAllowCleanupIfApplicationIsDown() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -570,8 +570,7 @@ public void shouldAllowCleanupIfApplicationIsDown() throws Exception { @Test public void shouldAllowCleanupIfLockIsReleased() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -609,8 +608,7 @@ public void shouldAllowCleanupIfLockIsReleased() throws Exception { @Test public void shouldAllowCleanupIfLockIsReleasedFromDifferentThread() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -649,26 +647,7 @@ public void shouldAllowCleanupIfLockIsReleasedFromDifferentThread() throws Excep @Test public void shouldFailWithOverlappingFileLockException() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - builder.addGlobalStore( - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore("storeName"), - Serdes.ByteArray(), - Serdes.ByteArray() - ), - "topicName", - Consumed.as("someName"), - () -> new Processor() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void close() {} - }); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -689,26 +668,7 @@ public void close() {} @Test public void shouldFailWithOverlappingFileLockExceptionWhenLockedByDifferentThread() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - builder.addGlobalStore( - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore("storeName"), - Serdes.ByteArray(), - Serdes.ByteArray() - ), - "topicName", - Consumed.as("someName"), - () -> new Processor() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void close() {} - }); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -738,26 +698,7 @@ public void close() {} @Test public void shouldFailWithOverlappingFileLockExceptionForGlobalTask() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - builder.addGlobalStore( - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore("storeName"), - Serdes.ByteArray(), - Serdes.ByteArray() - ), - "topicName", - Consumed.as("someName"), - () -> new Processor() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void close() {} - }); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -777,26 +718,7 @@ public void close() {} @Test public void shouldFailWithOverlappingFileLockExceptionForGlobalTaskWhenLockedByDifferentThread() throws Exception { - final StreamsBuilder builder = new StreamsBuilder(); - builder.addGlobalStore( - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore("storeName"), - Serdes.ByteArray(), - Serdes.ByteArray() - ), - "topicName", - Consumed.as("someName"), - () -> new Processor() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void close() {} - }); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final KafkaStreams streams = getStatefulApp(); final File taskDir = new File( props.getProperty(StreamsConfig.STATE_DIR_CONFIG) @@ -931,6 +853,36 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true); } + private KafkaStreams getStatefulApp() throws Exception { + final String inputTopic = testName.getMethodName() + "-input"; + final String outputTopic = testName.getMethodName() + "-output"; + final String globalTopicName = testName.getMethodName() + "-global"; + final String storeName = testName.getMethodName() + "-counts"; + final String globalStoreName = testName.getMethodName() + "-globalStore"; + final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.addGlobalStore( + Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("storeName"), + Serdes.ByteArray(), + Serdes.ByteArray() + ), + "topicName", + Consumed.as("someName"), + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }); + return new KafkaStreams(builder.build(), props); + } + @SuppressWarnings("unchecked") private Topology getStatefulTopology(final String inputTopic, final String outputTopic, From 5833e75602a8c4e1bcae37842075ebcdb748f1e2 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 21 Aug 2019 15:54:43 -0700 Subject: [PATCH 4/4] foo --- .../kafka/streams/KafkaStreamsTest.java | 23 +------------------ 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index fc139b5db0619..1733004bf73dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -30,8 +30,6 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; @@ -861,26 +859,7 @@ private KafkaStreams getStatefulApp() throws Exception { final String globalStoreName = testName.getMethodName() + "-globalStore"; final Topology topology = getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true); - final StreamsBuilder builder = new StreamsBuilder(); - builder.addGlobalStore( - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore("storeName"), - Serdes.ByteArray(), - Serdes.ByteArray() - ), - "topicName", - Consumed.as("someName"), - () -> new Processor() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public void process(final Object key, final Object value) {} - - @Override - public void close() {} - }); - return new KafkaStreams(builder.build(), props); + return new KafkaStreams(topology, props); } @SuppressWarnings("unchecked")