From 80398b2385055c2edfbc2e8f10100d66e1035eac Mon Sep 17 00:00:00 2001 From: mynameborat Date: Wed, 6 Nov 2019 16:49:21 -0800 Subject: [PATCH 1/8] SAMZA-2305: Stream processor should ensure previous container is stopped during a rebalance --- .../apache/samza/storage/StorageEngine.java | 6 +- .../samza/processor/StreamProcessor.java | 55 ++++++++++++--- ...nTransactionalStateTaskRestoreManager.java | 2 +- .../apache/samza/storage/StorageRecovery.java | 7 +- .../samza/storage/TaskRestoreManager.java | 2 +- .../TransactionalStateTaskRestoreManager.java | 2 +- .../samza/container/SamzaContainer.scala | 22 +++++- .../storage/ContainerStorageManager.java | 54 ++++++++++----- .../apache/samza/system/SystemConsumers.scala | 17 ++++- .../apache/samza/system/SystemProducers.scala | 9 ++- .../samza/processor/TestStreamProcessor.java | 67 +++++++++++++++++-- .../samza/container/TestSamzaContainer.scala | 42 ++++++++++++ .../storage/TestContainerStorageManager.java | 4 +- .../storage/kv/KeyValueStorageEngine.scala | 7 +- 14 files changed, 250 insertions(+), 46 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java index 8add1de570..7b12c85eee 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java @@ -43,11 +43,15 @@ public interface StorageEngine { * provided in one {@link java.util.Iterator} and not deserialized for * efficiency, allowing the implementation to optimize replay, if possible. * + * The implementers are expected to handle interrupt signals to the restoration thread and rethrow the exception to + * upstream so that {@code TaskRestoreManager} can accordingly notify the container. + * * @param envelopes * An iterator of envelopes that the storage engine can read from to * restore its state on startup. + * @throws InterruptedException when received interrupts during restoration */ - void restore(ChangelogSSPIterator envelopes); + void restore(ChangelogSSPIterator envelopes) throws InterruptedException; /** * Flush any cached messages diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index f952f6eff5..1ef75071fc 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -133,7 +133,6 @@ public class StreamProcessor { private final Config config; private final long taskShutdownMs; private final String processorId; - private final ExecutorService containerExcecutorService; private final Object lock = new Object(); private final MetricsRegistryMap metricsRegistry; private final MetadataStore metadataStore; @@ -176,6 +175,9 @@ public State getState() { @VisibleForTesting JobCoordinatorListener jobCoordinatorListener = null; + @VisibleForTesting + ExecutorService containerExecutorService; + /** * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except * it creates a {@link JobCoordinator} instead of accepting it as an argument. @@ -288,11 +290,15 @@ public StreamProcessor(String processorId, Config config, Map @@ -346,9 +352,10 @@ public void stop() { try { LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); boolean hasContainerShutdown = stopSamzaContainer(); + // todo: use interruptSamzaContainer method instead to give one more chance for container to shutdown if (!hasContainerShutdown) { LOGGER.info("Interrupting the container: {} thread to die.", container); - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } } catch (Throwable throwable) { LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); @@ -444,6 +451,21 @@ private boolean stopSamzaContainer() { return hasContainerShutdown; } + private boolean interruptSamzaContainer() { + if (container != null) { + try { + containerExecutorService.shutdownNow(); + containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback " + + "decremented the shutdown latch. "); + } + } + + // we call interrupt successful as long as the shut down latch is decremented by the container call back. + return containerShutdownLatch.getCount() == 0; + } + private JobCoordinatorListener createJobCoordinatorListener() { return new JobCoordinatorListener() { @@ -461,8 +483,25 @@ public void onJobModelExpired() { } else { LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); } + } else if (state == State.IN_REBALANCE) { + if (container != null) { + boolean hasContainerShutdown = interruptSamzaContainer(); + if (!hasContainerShutdown) { + LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. " + + "Stopping the stream processor: {}", container, processorId); + state = State.STOPPING; + jobCoordinator.stop(); + } else { + Preconditions.checkState(containerExecutorService.isShutdown(), + "Executor service did not shutdown cleanly."); + containerExecutorService = createExecutorService(); + } + } else { + LOGGER.info("Ignoring Job model expired since a rebalance is already in progress"); + } } else { - LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED)); + LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, + ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE)); } } } @@ -475,7 +514,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) { container = createSamzaContainer(processorId, jobModel); container.setContainerListener(new ContainerListener()); LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - containerExcecutorService.submit(container); + containerExecutorService.submit(container); } else { LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); } @@ -490,7 +529,7 @@ public void onCoordinatorStop() { // we only want to interrupt when container shutdown times out. if (!hasContainerShutdown) { - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } state = State.STOPPED; } @@ -508,7 +547,7 @@ public void onCoordinatorFailure(Throwable throwable) { // we only want to interrupt when container shutdown times out. if (!hasContainerShutdown) { - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } state = State.STOPPED; } diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index 70c61746e8..d2f009798a 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -316,7 +316,7 @@ private String getStartingOffset(SystemStreamPartition systemStreamPartition, Sy * Restore each store in taskStoresToRestore sequentially */ @Override - public void restore() { + public void restore() throws InterruptedException { for (String storeName : taskStoresToRestore) { LOG.info("Restoring store: {} for task: {}", storeName, taskModel.getTaskName()); SystemConsumer systemConsumer = storeConsumers.get(storeName); diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 2cbeddcdf4..d2f3c12b43 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -117,7 +117,12 @@ public void run() { systemAdmins.start(); this.containerStorageManagers.forEach((containerName, containerStorageManager) -> { - containerStorageManager.start(); + try { + containerStorageManager.start(); + } catch (InterruptedException e) { + LOG.warn("Received an interrupt during store restoration for container {}." + + " Proceeding with the next container", containerName); + } }); this.containerStorageManagers.forEach((containerName, containerStorageManager) -> { containerStorageManager.shutdown(); diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java index 2bdeeead05..3435b5a1de 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java @@ -36,7 +36,7 @@ public interface TaskRestoreManager { /** * Restore state from checkpoints, state snapshots and changelog. */ - void restore(); + void restore() throws InterruptedException; /** * Stop all persistent stores after restoring. diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 689d431cbd..8133fac7be 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -117,7 +117,7 @@ public void init(Map checkpointedChangelogOffsets } @Override - public void restore() { + public void restore() throws InterruptedException { Map storesToRestore = storeActions.storesToRestore; for (Map.Entry entry : storesToRestore.entrySet()) { diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index d15c1090f2..c45ea678bd 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -27,7 +27,6 @@ import java.time.Duration import java.util import java.util.{Base64, Optional} import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit} -import java.util.stream.Collectors import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -769,6 +768,27 @@ class SamzaContainer( else Thread.sleep(Long.MaxValue) } catch { + case e: InterruptedException => + /* + * We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within + * our code is inside stream processor is during the following two scenarios + * 1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor. + * Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier + * to agree on the new work assignment. + * 2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to + * shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in + * turn interrupts the container thread. + * + * In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally + * (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown + * the container since it is not capable of reacting to shutdown signals in all scenarios. + * + */ + if (status.equals(SamzaContainerStatus.STARTED)) { + warn("Received an interrupt in run loop.", e) + } else { + warn("Received an interrupt during initialization.", e) + } case e: Throwable => if (status.equals(SamzaContainerStatus.STARTED)) { error("Caught exception/error in run loop.", e) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index afd3e69bd8..4f66d614da 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -632,7 +632,7 @@ private Set getSideInputStorageManagers() { return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } - public void start() throws SamzaException { + public void start() throws SamzaException, InterruptedException { Map checkpointedChangelogSSPOffsets = new HashMap<>(); if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) { getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> { @@ -657,7 +657,8 @@ public void start() throws SamzaException { } // Restoration of all stores, in parallel across tasks - private void restoreStores(Map checkpointedChangelogSSPOffsets) { + private void restoreStores(Map checkpointedChangelogSSPOffsets) + throws InterruptedException { LOG.info("Store Restore started"); // initialize each TaskStorageManager @@ -665,7 +666,7 @@ private void restoreStores(Map checkpointedChange taskStorageManager.init(checkpointedChangelogSSPOffsets)); // Start each store consumer once - this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.start()); + this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start); // Create a thread pool for parallel restores (and stopping of persistent stores) ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize, @@ -684,6 +685,11 @@ private void restoreStores(Map checkpointedChange for (Future future : taskRestoreFutures) { try { future.get(); + } catch (InterruptedException e) { + LOG.warn("Received an interrupt during store restoration. Issuing interrupts to the store restoration workers to exit " + + "prematurely without restoring full state."); + executorService.shutdownNow(); + throw e; } catch (Exception e) { LOG.error("Exception when restoring ", e); throw new SamzaException("Exception when restoring ", e); @@ -693,7 +699,7 @@ private void restoreStores(Map checkpointedChange executorService.shutdown(); // Stop each store consumer once - this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.stop()); + this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop); // Now re-create persistent stores in read-write mode, leave non-persistent stores as-is recreatePersistentTaskStoresInReadWriteMode(this.containerModel, jobContext, containerContext, @@ -791,6 +797,14 @@ public void run() { } } catch (InterruptedException e) { + LOG.warn("Received an interrupt during side inputs store restoration." + + " Exiting prematurely without completing store restore."); + /* + * We want to stop side input restoration and rethrow the exception upstream. Container should handle the + * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the + * resources prematurely here. + */ + shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence? throw new SamzaException("Side inputs read was interrupted", e); } @@ -920,22 +934,26 @@ public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskName @Override public Void call() { long startTime = System.currentTimeMillis(); - LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); - taskRestoreManager.restore(); - - // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted - // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). - taskRestoreManager.stopPersistentStores(); - - long timeToRestore = System.currentTimeMillis() - startTime; - - if (this.samzaContainerMetrics != null) { - Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null); - - if (taskGauge != null) { - taskGauge.set(timeToRestore); + try { + LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); + taskRestoreManager.restore(); + } catch (InterruptedException e) { + LOG.warn("Received an interrupt during store restoration for task: {}.", this.taskName.getTaskName()); + } finally { + // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted + // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). + taskRestoreManager.stopPersistentStores(); + long timeToRestore = System.currentTimeMillis() - startTime; + + if (this.samzaContainerMetrics != null) { + Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null); + + if (taskGauge != null) { + taskGauge.set(timeToRestore); + } } } + return null; } } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index b41c2454d8..d6decf1720 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -137,6 +137,13 @@ class SystemConsumers ( */ private val emptySystemStreamPartitionsBySystem = new HashMap[String, Set[SystemStreamPartition]]() + /** + * Denotes if the SystemConsumers have started. The flag is useful in the event of shutting down since interrupt + * on Samza Container will shutdown components and container currently doesn't track what components have started + * successfully. + */ + private var started = false + /** * Default timeout to noNewMessagesTimeout. Every time SystemConsumers * receives incoming messages, it sets timeout to 0. Every time @@ -185,15 +192,19 @@ class SystemConsumers ( chooser.start + started = true + refresh } def stop { - debug("Stopping consumers.") + if (started) { + debug("Stopping consumers.") - consumers.values.foreach(_.stop) + consumers.values.foreach(_.stop) - chooser.stop + chooser.stop + } } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index e8ce961af0..08ecdec060 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -35,16 +35,21 @@ class SystemProducers( */ dropSerializationError: Boolean = false) extends Logging { + private var started = false + def start { debug("Starting producers.") producers.values.foreach(_.start) + started = true } def stop { - debug("Stopping producers.") + if (started) { + debug("Stopping producers.") - producers.values.foreach(_.stop) + producers.values.foreach(_.stop) + } } def register(source: String) { diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 6d78b77d10..fa24a973a0 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -54,9 +55,7 @@ import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -467,13 +466,69 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { assertEquals(State.STOPPING, streamProcessor.getState()); Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown(); Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); + } - // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP. - streamProcessor.state = State.IN_REBALANCE; + @Test + public void testJobModelExpiredDuringAnExistingRebalance() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); + ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, + Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class)); - streamProcessor.jobCoordinatorListener.onJobModelExpired(); + runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, false); assertEquals(State.IN_REBALANCE, streamProcessor.state); + assertNotEquals(mockExecutorService, streamProcessor.containerExecutorService); + Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); + Mockito.verify(mockExecutorService, Mockito.times(1)).isShutdown(); + } + + @Test + public void testJobModelExpiredDuringAnExistingRebalanceWithContainerInterruptFailed() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); + ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, + Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class)); + + runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, true); + + assertEquals(State.STOPPING, streamProcessor.state); + assertEquals(mockExecutorService, streamProcessor.containerExecutorService); + Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); + Mockito.verify(mockExecutorService, Mockito.times(0)).isShutdown(); + Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); + } + + private void runJobModelExpireDuringRebalance(StreamProcessor streamProcessor, ExecutorService executorService, + boolean failContainerInterrupt) { + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + CountDownLatch shutdownLatch = new CountDownLatch(1); + + /* + * When there is an initialized container that hasn't started and the stream processor is still in re-balance phase, + * subsequent job model expire request should attempt to interrupt the existing container to safely shut it down + * before proceeding to join the barrier. As part of safe shutdown sequence, we want to ensure shutdownNow is invoked + * on the existing executorService to signal interrupt and make sure new executor service is created. + */ + + Mockito.when(executorService.shutdownNow()).thenAnswer(ctx -> { + if (!failContainerInterrupt) { + shutdownLatch.countDown(); + } + return null; + }); + Mockito.when(executorService.isShutdown()).thenReturn(true); + + streamProcessor.state = State.IN_REBALANCE; + streamProcessor.container = mockSamzaContainer; + streamProcessor.containerExecutorService = executorService; + streamProcessor.containerShutdownLatch = shutdownLatch; + + streamProcessor.jobCoordinatorListener.onJobModelExpired(); } @Test diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 78136bf03c..69223df290 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -35,6 +35,8 @@ import org.junit.Assert._ import org.junit.{Before, Test} import org.mockito.Matchers.{any, notNull} import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.mockito.{Mock, Mockito, MockitoAnnotations} import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar @@ -139,6 +141,46 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { verify(this.runLoop).run() } + @Test + def testInterruptDuringStoreRestorationShutdownContainer(): Unit = { + when(this.containerStorageManager.start()) + .thenAnswer(new Answer[Void] { + override def answer(mock: InvocationOnMock): Void = { + Thread.sleep(1000) + throw new InterruptedException("Injecting interrupt into container storage manager") + } + }) + + this.samzaContainer.run + + assertEquals(SamzaContainerStatus.STOPPED, this.samzaContainer.getStatus()) + verify(this.samzaContainerListener).beforeStart() + verify(this.samzaContainerListener).afterStop() + verify(this.samzaContainerListener, never()).afterFailure(any()) + verify(this.runLoop, times(0)).run() + } + + @Test + def testInterruptDuringStoreRestorationWithErrorsDuringContainerShutdown(): Unit = { + when(this.containerStorageManager.start()) + .thenAnswer(new Answer[Void] { + override def answer(mock: InvocationOnMock): Void = { + Thread.sleep(1000) + throw new InterruptedException("Injecting interrupt into container storage manager") + } + }) + + when(this.taskInstance.shutdownTask).thenThrow(new RuntimeException("Trigger a shutdown, please.")) + + this.samzaContainer.run + + assertEquals(SamzaContainerStatus.FAILED, this.samzaContainer.getStatus()) + verify(this.samzaContainerListener).beforeStart() + verify(this.samzaContainerListener).afterFailure(any()) + verify(this.samzaContainerListener, never()).afterStop() + verify(this.runLoop, times(0)).run() + } + @Test def testFailureDuringShutdown(): Unit = { doNothing().when(this.runLoop).run() // run loop completes successfully diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index 43872e906e..bdc7e0ee72 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -101,7 +101,7 @@ private void addMockedTask(String taskname, int changelogPartition) { * Method to create a containerStorageManager with mocked dependencies */ @Before - public void setUp() { + public void setUp() throws InterruptedException { taskRestoreMetricGauges = new HashMap<>(); this.tasks = new HashMap<>(); this.taskInstanceMetrics = new HashMap<>(); @@ -243,7 +243,7 @@ public Void answer(InvocationOnMock invocation) { } @Test - public void testParallelismAndMetrics() { + public void testParallelismAndMetrics() throws InterruptedException { this.containerStorageManager.start(); this.containerStorageManager.shutdown(); diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index bc6778eed4..1a4c4fd7db 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -124,7 +124,7 @@ class KeyValueStorageEngine[K, V]( val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize) var lastBatchFlushed = false - while(iterator.hasNext) { + while(iterator.hasNext && !Thread.currentThread().isInterrupted) { val envelope = iterator.next() val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]] val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]] @@ -200,6 +200,11 @@ class KeyValueStorageEngine[K, V]( // flush the store and the changelog producer flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog producers. This only flushes the stores. + + if (Thread.currentThread().isInterrupted) { + warn("Received an interrupt during store restoration. Exiting without restore the full state.") + throw new InterruptedException("Received an interrupt during store restoration.") + } } def flush() = { From cb4f1bcb1a09fa85d33cbf9d0d33876b9c5e4e7c Mon Sep 17 00:00:00 2001 From: mynameborat Date: Wed, 6 Nov 2019 17:09:21 -0800 Subject: [PATCH 2/8] minor doc fixes --- .../main/scala/org/apache/samza/system/SystemProducers.scala | 5 +++++ .../org/apache/samza/storage/kv/KeyValueStorageEngine.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index 08ecdec060..8546e68ac0 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -35,6 +35,11 @@ class SystemProducers( */ dropSerializationError: Boolean = false) extends Logging { + /** + * Denotes if the SystemConsumers have started. The flag is useful in the event of shutting down since interrupt + * on Samza Container will shutdown components and container currently doesn't track what components have started + * successfully. + */ private var started = false def start { diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 1a4c4fd7db..afba824d3a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -202,7 +202,7 @@ class KeyValueStorageEngine[K, V]( flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog producers. This only flushes the stores. if (Thread.currentThread().isInterrupted) { - warn("Received an interrupt during store restoration. Exiting without restore the full state.") + warn("Received an interrupt during store restoration. Exiting without restoring the full state.") throw new InterruptedException("Received an interrupt during store restoration.") } } From 243b8482da0322d75122de04dc77f5df6808acaa Mon Sep 17 00:00:00 2001 From: mynameborat Date: Fri, 8 Nov 2019 15:30:14 -0800 Subject: [PATCH 3/8] Address Dan's feedback --- .../samza/processor/StreamProcessor.java | 19 ++++++++----------- .../apache/samza/system/SystemConsumers.scala | 4 ++++ .../apache/samza/system/SystemProducers.scala | 3 +++ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 1ef75071fc..3aed8c7842 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -352,7 +352,6 @@ public void stop() { try { LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); boolean hasContainerShutdown = stopSamzaContainer(); - // todo: use interruptSamzaContainer method instead to give one more chance for container to shutdown if (!hasContainerShutdown) { LOGGER.info("Interrupting the container: {} thread to die.", container); containerExecutorService.shutdownNow(); @@ -451,15 +450,13 @@ private boolean stopSamzaContainer() { return hasContainerShutdown; } - private boolean interruptSamzaContainer() { - if (container != null) { - try { - containerExecutorService.shutdownNow(); - containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback " - + "decremented the shutdown latch. "); - } + private boolean interruptContainerAndShutdownExecutorService() { + try { + containerExecutorService.shutdownNow(); + containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback " + + "decremented the shutdown latch. "); } // we call interrupt successful as long as the shut down latch is decremented by the container call back. @@ -485,7 +482,7 @@ public void onJobModelExpired() { } } else if (state == State.IN_REBALANCE) { if (container != null) { - boolean hasContainerShutdown = interruptSamzaContainer(); + boolean hasContainerShutdown = interruptContainerAndShutdownExecutorService(); if (!hasContainerShutdown) { LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. " + "Stopping the stream processor: {}", container, processorId); diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index d6decf1720..580fd605d8 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -204,6 +204,10 @@ class SystemConsumers ( consumers.values.foreach(_.stop) chooser.stop + + started = false + } else { + debug("Ignoring the consumers stop request since it never started.") } } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index 8546e68ac0..1744195fab 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -54,6 +54,9 @@ class SystemProducers( debug("Stopping producers.") producers.values.foreach(_.stop) + started = false + } else { + debug("Ignoring the producers stop request since it never started.") } } From 42b2a9aa2c75440718806d2b90c403d3f1f092bb Mon Sep 17 00:00:00 2001 From: mynameborat Date: Tue, 19 Nov 2019 12:05:06 -0800 Subject: [PATCH 4/8] Address review comments --- .../samza/processor/StreamProcessor.java | 22 ++++++++++--------- .../apache/samza/storage/StorageRecovery.java | 2 ++ .../samza/container/SamzaContainer.scala | 2 +- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 3aed8c7842..15890456fc 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -85,16 +85,18 @@ * * Describes the valid state transitions of the {@link StreamProcessor}. * - * - * ──────────────────────────────── - * │ │ - * │ │ - * │ │ - * │ │ - * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ - * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING - * Creation │ │ by group leader │ and starts Container │ - * │ │ │ │ + * Receives another re-balance request when the container + * from the previous re-balance is still in INIT phase + * ──────────────────────────────────────────────── + * │ │ │ + * │ │ │ + * │ │ │ + * │ │ │ + * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ │ + * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING │ + * Creation │ │ by group leader │ and starts │Container │ │ + * │ │ │ │ │ │ + * │ │ │ ─────────────────────────────── * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() * │ │ │ │ * │ │ │ │ diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index d2f3c12b43..11237a8ae6 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -120,6 +120,8 @@ public void run() { try { containerStorageManager.start(); } catch (InterruptedException e) { + // we can ignore the exception since its only used in the context of a command line tool and bubbling the + // exception upstream isn't needed. LOG.warn("Received an interrupt during store restoration for container {}." + " Proceeding with the next container", containerName); } diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index c45ea678bd..092175da32 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -771,7 +771,7 @@ class SamzaContainer( case e: InterruptedException => /* * We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within - * our code is inside stream processor is during the following two scenarios + * our code inside stream processor is during the following two scenarios * 1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor. * Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier * to agree on the new work assignment. From 15a56bf3a267eec5eee25dc9a97986758a9f0a4f Mon Sep 17 00:00:00 2001 From: mynameborat Date: Tue, 3 Dec 2019 16:39:49 -0800 Subject: [PATCH 5/8] Address Cameron's comments --- .../java/org/apache/samza/processor/StreamProcessor.java | 2 -- .../org/apache/samza/storage/TaskRestoreManager.java | 9 +++++++++ .../apache/samza/storage/ContainerStorageManager.java | 6 ++++++ .../org/apache/samza/processor/TestStreamProcessor.java | 5 ++++- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 15890456fc..5a395207d0 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -491,8 +491,6 @@ public void onJobModelExpired() { state = State.STOPPING; jobCoordinator.stop(); } else { - Preconditions.checkState(containerExecutorService.isShutdown(), - "Executor service did not shutdown cleanly."); containerExecutorService = createExecutorService(); } } else { diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java index 3435b5a1de..f60e14876b 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java @@ -35,6 +35,15 @@ public interface TaskRestoreManager { /** * Restore state from checkpoints, state snapshots and changelog. + * Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of + * interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore + * thread. + * + * Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is + * waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the + * thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream. + * {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container + * to notify that its shutting down. */ void restore() throws InterruptedException; diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 4f66d614da..8623e5d9c9 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -938,6 +938,12 @@ public Void call() { LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); taskRestoreManager.restore(); } catch (InterruptedException e) { + /* + * The container thread is the only external source to trigger an interrupt to the restoration thread and thus + * it is okay to swallow this exception and not propagate it upstream. If the container is interrupted during + * the store restoration, ContainerStorageManager signals the restore workers to abandon restoration and then + * finally propagates the exception upstream to trigger container shutdown. + */ LOG.warn("Received an interrupt during store restoration for task: {}.", this.taskName.getTaskName()); } finally { // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index fa24a973a0..c61ec34890 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -55,7 +55,10 @@ import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; From 798325d714c4180cad27c5ad30471edd98351ad2 Mon Sep 17 00:00:00 2001 From: mynameborat Date: Mon, 9 Dec 2019 00:27:06 -0800 Subject: [PATCH 6/8] Removed assertion on isShutdown call on executor service --- .../java/org/apache/samza/processor/TestStreamProcessor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index c61ec34890..0978738eda 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -485,7 +485,6 @@ public void testJobModelExpiredDuringAnExistingRebalance() { assertEquals(State.IN_REBALANCE, streamProcessor.state); assertNotEquals(mockExecutorService, streamProcessor.containerExecutorService); Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); - Mockito.verify(mockExecutorService, Mockito.times(1)).isShutdown(); } @Test @@ -502,7 +501,6 @@ public void testJobModelExpiredDuringAnExistingRebalanceWithContainerInterruptFa assertEquals(State.STOPPING, streamProcessor.state); assertEquals(mockExecutorService, streamProcessor.containerExecutorService); Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); - Mockito.verify(mockExecutorService, Mockito.times(0)).isShutdown(); Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); } From 67c116d6b54e6631285927b8eb61efb16c0b9b59 Mon Sep 17 00:00:00 2001 From: mynameborat Date: Tue, 10 Dec 2019 11:27:19 -0800 Subject: [PATCH 7/8] Add unit test for storage engine restore --- .../kv/TestKeyValueStorageEngine.scala | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala index 25445c2912..081febba3a 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -21,8 +21,10 @@ package org.apache.samza.storage.kv import java.io.File import java.util.Arrays +import java.util.concurrent.{Callable, ExecutionException, ExecutorService, Executors} import org.apache.samza.Partition +import org.apache.samza.context.Context import org.apache.samza.storage.StoreProperties import org.apache.samza.system.ChangelogSSPIterator.Mode import org.apache.samza.system.{ChangelogSSPIterator, IncomingMessageEnvelope, SystemStreamPartition} @@ -30,6 +32,8 @@ import org.apache.samza.task.MessageCollector import org.junit.Assert._ import org.junit.{After, Before, Test} import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer class TestKeyValueStorageEngine { var engine: KeyValueStorageEngine[String, String] = null @@ -163,6 +167,50 @@ class TestKeyValueStorageEngine { assertEquals(15, metrics.restoredBytesGauge.getValue) // 3 keys * 2 bytes/key + 3 msgs * 3 bytes/msg } + @Test + def testRestoreInterruptedThrowsInterruptException(): Unit = { + val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0)) + val iterator = mock(classOf[ChangelogSSPIterator]) + val executorService = Executors.newSingleThreadExecutor() + val restore = new Callable[Void] { + override def call(): Void = { + engine.restore(iterator) + null + } + } + + when(iterator.hasNext) + .thenReturn(true) + .thenReturn(true) + .thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + executorService.shutdownNow() + true + } + }) + .thenReturn(false) + when(iterator.next()) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), Array[Byte](3, 4, 5))) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), Array[Byte](4, 5, 6))) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), Array[Byte](5, 6, 7))) + when(iterator.getMode) + .thenReturn(Mode.RESTORE) + .thenReturn(Mode.RESTORE) + .thenReturn(Mode.RESTORE) + + try { + val restoreFuture = executorService.submit(restore) + restoreFuture.get() + } catch { + case e: ExecutionException => { + assertTrue(e.getCause.isInstanceOf[InterruptedException]) + // Make sure we don't restore any more records and bail out + assertEquals(2, metrics.restoredMessagesGauge.getValue) + } + case _: Throwable => fail("Expected execution exception during restoration") + } + } + @Test(expected = classOf[IllegalStateException]) def testThrowsIfIteratorModeChangesFromTrimToRestore(): Unit = { val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0)) From 6751b5358e9f67e53fb6259ccde07fc8c7b27e86 Mon Sep 17 00:00:00 2001 From: mynameborat Date: Tue, 10 Dec 2019 11:37:49 -0800 Subject: [PATCH 8/8] Address review comments --- .../org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala index 081febba3a..151fd846a4 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -201,13 +201,13 @@ class TestKeyValueStorageEngine { try { val restoreFuture = executorService.submit(restore) restoreFuture.get() + fail("Expected execution exception during restoration") } catch { case e: ExecutionException => { assertTrue(e.getCause.isInstanceOf[InterruptedException]) // Make sure we don't restore any more records and bail out assertEquals(2, metrics.restoredMessagesGauge.getValue) } - case _: Throwable => fail("Expected execution exception during restoration") } }