diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java index 8add1de570..7b12c85eee 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java @@ -43,11 +43,15 @@ public interface StorageEngine { * provided in one {@link java.util.Iterator} and not deserialized for * efficiency, allowing the implementation to optimize replay, if possible. * + * The implementers are expected to handle interrupt signals to the restoration thread and rethrow the exception to + * upstream so that {@code TaskRestoreManager} can accordingly notify the container. + * * @param envelopes * An iterator of envelopes that the storage engine can read from to * restore its state on startup. + * @throws InterruptedException when received interrupts during restoration */ - void restore(ChangelogSSPIterator envelopes); + void restore(ChangelogSSPIterator envelopes) throws InterruptedException; /** * Flush any cached messages diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index f952f6eff5..5a395207d0 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -85,16 +85,18 @@ * * Describes the valid state transitions of the {@link StreamProcessor}. * - * - * ──────────────────────────────── - * │ │ - * │ │ - * │ │ - * │ │ - * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ - * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING - * Creation │ │ by group leader │ and starts Container │ - * │ │ │ │ + * Receives another re-balance request when the container + * from the previous re-balance is still in INIT phase + * ──────────────────────────────────────────────── + * │ │ │ + * │ │ │ + * │ │ │ + * │ │ │ + * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ │ + * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING │ + * Creation │ │ by group leader │ and starts │Container │ │ + * │ │ │ │ │ │ + * │ │ │ ─────────────────────────────── * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() * │ │ │ │ * │ │ │ │ @@ -133,7 +135,6 @@ public class StreamProcessor { private final Config config; private final long taskShutdownMs; private final String processorId; - private final ExecutorService containerExcecutorService; private final Object lock = new Object(); private final MetricsRegistryMap metricsRegistry; private final MetadataStore metadataStore; @@ -176,6 +177,9 @@ public State getState() { @VisibleForTesting JobCoordinatorListener jobCoordinatorListener = null; + @VisibleForTesting + ExecutorService containerExecutorService; + /** * Same as {@link #StreamProcessor(String, Config, Map, TaskFactory, ProcessorLifecycleListener, JobCoordinator)}, except * it creates a {@link JobCoordinator} instead of accepting it as an argument. @@ -288,11 +292,15 @@ public StreamProcessor(String processorId, Config config, Map @@ -348,7 +356,7 @@ public void stop() { boolean hasContainerShutdown = stopSamzaContainer(); if (!hasContainerShutdown) { LOGGER.info("Interrupting the container: {} thread to die.", container); - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } } catch (Throwable throwable) { LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); @@ -444,6 +452,19 @@ private boolean stopSamzaContainer() { return hasContainerShutdown; } + private boolean interruptContainerAndShutdownExecutorService() { + try { + containerExecutorService.shutdownNow(); + containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.info("Received an interrupt during interrupting container. Proceeding to check if the container callback " + + "decremented the shutdown latch. "); + } + + // we call interrupt successful as long as the shut down latch is decremented by the container call back. + return containerShutdownLatch.getCount() == 0; + } + private JobCoordinatorListener createJobCoordinatorListener() { return new JobCoordinatorListener() { @@ -461,8 +482,23 @@ public void onJobModelExpired() { } else { LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); } + } else if (state == State.IN_REBALANCE) { + if (container != null) { + boolean hasContainerShutdown = interruptContainerAndShutdownExecutorService(); + if (!hasContainerShutdown) { + LOGGER.warn("Job model expire unsuccessful. Failed to interrupt container: {} safely. " + + "Stopping the stream processor: {}", container, processorId); + state = State.STOPPING; + jobCoordinator.stop(); + } else { + containerExecutorService = createExecutorService(); + } + } else { + LOGGER.info("Ignoring Job model expired since a rebalance is already in progress"); + } } else { - LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED)); + LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, + ImmutableList.of(State.RUNNING, State.STARTED, State.IN_REBALANCE)); } } } @@ -475,7 +511,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) { container = createSamzaContainer(processorId, jobModel); container.setContainerListener(new ContainerListener()); LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - containerExcecutorService.submit(container); + containerExecutorService.submit(container); } else { LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); } @@ -490,7 +526,7 @@ public void onCoordinatorStop() { // we only want to interrupt when container shutdown times out. if (!hasContainerShutdown) { - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } state = State.STOPPED; } @@ -508,7 +544,7 @@ public void onCoordinatorFailure(Throwable throwable) { // we only want to interrupt when container shutdown times out. if (!hasContainerShutdown) { - containerExcecutorService.shutdownNow(); + containerExecutorService.shutdownNow(); } state = State.STOPPED; } diff --git a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java index 70c61746e8..d2f009798a 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java @@ -316,7 +316,7 @@ private String getStartingOffset(SystemStreamPartition systemStreamPartition, Sy * Restore each store in taskStoresToRestore sequentially */ @Override - public void restore() { + public void restore() throws InterruptedException { for (String storeName : taskStoresToRestore) { LOG.info("Restoring store: {} for task: {}", storeName, taskModel.getTaskName()); SystemConsumer systemConsumer = storeConsumers.get(storeName); diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 2cbeddcdf4..11237a8ae6 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -117,7 +117,14 @@ public void run() { systemAdmins.start(); this.containerStorageManagers.forEach((containerName, containerStorageManager) -> { - containerStorageManager.start(); + try { + containerStorageManager.start(); + } catch (InterruptedException e) { + // we can ignore the exception since its only used in the context of a command line tool and bubbling the + // exception upstream isn't needed. + LOG.warn("Received an interrupt during store restoration for container {}." + + " Proceeding with the next container", containerName); + } }); this.containerStorageManagers.forEach((containerName, containerStorageManager) -> { containerStorageManager.shutdown(); diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java index 2bdeeead05..f60e14876b 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TaskRestoreManager.java @@ -35,8 +35,17 @@ public interface TaskRestoreManager { /** * Restore state from checkpoints, state snapshots and changelog. + * Currently, store restoration happens on a separate thread pool within {@code ContainerStorageManager}. In case of + * interrupt/shutdown signals from {@code SamzaContainer}, {@code ContainerStorageManager} may interrupt the restore + * thread. + * + * Note: Typically, interrupt signals don't bubble up as {@link InterruptedException} unless the restore thread is + * waiting on IO/network. In case of busy looping, implementors are expected to check the interrupt status of the + * thread periodically and shutdown gracefully before throwing {@link InterruptedException} upstream. + * {@code SamzaContainer} will not wait for clean up and the interrupt signal is the best effort by the container + * to notify that its shutting down. */ - void restore(); + void restore() throws InterruptedException; /** * Stop all persistent stores after restoring. diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java index 689d431cbd..8133fac7be 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java @@ -117,7 +117,7 @@ public void init(Map checkpointedChangelogOffsets } @Override - public void restore() { + public void restore() throws InterruptedException { Map storesToRestore = storeActions.storesToRestore; for (Map.Entry entry : storesToRestore.entrySet()) { diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index d15c1090f2..092175da32 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -27,7 +27,6 @@ import java.time.Duration import java.util import java.util.{Base64, Optional} import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit} -import java.util.stream.Collectors import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -769,6 +768,27 @@ class SamzaContainer( else Thread.sleep(Long.MaxValue) } catch { + case e: InterruptedException => + /* + * We don't want to categorize interrupts as failure since the only place the container thread gets interrupted within + * our code inside stream processor is during the following two scenarios + * 1. During a re-balance, if the container has not started or hasn't reported start status to StreamProcessor. + * Subsequently stream processor attempts to interrupt the container thread before proceeding to join the barrier + * to agree on the new work assignment. + * 2. During shutdown signals to stream processor (external or internal), the stream processor signals the container to + * shutdown and waits for `task.shutdown.ms` before forcefully shutting down the container executor service which in + * turn interrupts the container thread. + * + * In the both of these scenarios, the failure cause is either captured externally (timing out scenario) or internally + * (failed attempt to shut down the container). The act of interrupting the container thread is an explicit intent to shutdown + * the container since it is not capable of reacting to shutdown signals in all scenarios. + * + */ + if (status.equals(SamzaContainerStatus.STARTED)) { + warn("Received an interrupt in run loop.", e) + } else { + warn("Received an interrupt during initialization.", e) + } case e: Throwable => if (status.equals(SamzaContainerStatus.STARTED)) { error("Caught exception/error in run loop.", e) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index afd3e69bd8..8623e5d9c9 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -632,7 +632,7 @@ private Set getSideInputStorageManagers() { return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } - public void start() throws SamzaException { + public void start() throws SamzaException, InterruptedException { Map checkpointedChangelogSSPOffsets = new HashMap<>(); if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) { getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> { @@ -657,7 +657,8 @@ public void start() throws SamzaException { } // Restoration of all stores, in parallel across tasks - private void restoreStores(Map checkpointedChangelogSSPOffsets) { + private void restoreStores(Map checkpointedChangelogSSPOffsets) + throws InterruptedException { LOG.info("Store Restore started"); // initialize each TaskStorageManager @@ -665,7 +666,7 @@ private void restoreStores(Map checkpointedChange taskStorageManager.init(checkpointedChangelogSSPOffsets)); // Start each store consumer once - this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.start()); + this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start); // Create a thread pool for parallel restores (and stopping of persistent stores) ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize, @@ -684,6 +685,11 @@ private void restoreStores(Map checkpointedChange for (Future future : taskRestoreFutures) { try { future.get(); + } catch (InterruptedException e) { + LOG.warn("Received an interrupt during store restoration. Issuing interrupts to the store restoration workers to exit " + + "prematurely without restoring full state."); + executorService.shutdownNow(); + throw e; } catch (Exception e) { LOG.error("Exception when restoring ", e); throw new SamzaException("Exception when restoring ", e); @@ -693,7 +699,7 @@ private void restoreStores(Map checkpointedChange executorService.shutdown(); // Stop each store consumer once - this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.stop()); + this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop); // Now re-create persistent stores in read-write mode, leave non-persistent stores as-is recreatePersistentTaskStoresInReadWriteMode(this.containerModel, jobContext, containerContext, @@ -791,6 +797,14 @@ public void run() { } } catch (InterruptedException e) { + LOG.warn("Received an interrupt during side inputs store restoration." + + " Exiting prematurely without completing store restore."); + /* + * We want to stop side input restoration and rethrow the exception upstream. Container should handle the + * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the + * resources prematurely here. + */ + shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence? throw new SamzaException("Side inputs read was interrupted", e); } @@ -920,22 +934,32 @@ public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskName @Override public Void call() { long startTime = System.currentTimeMillis(); - LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); - taskRestoreManager.restore(); - - // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted - // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). - taskRestoreManager.stopPersistentStores(); - - long timeToRestore = System.currentTimeMillis() - startTime; - - if (this.samzaContainerMetrics != null) { - Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null); - - if (taskGauge != null) { - taskGauge.set(timeToRestore); + try { + LOG.info("Starting stores in task instance {}", this.taskName.getTaskName()); + taskRestoreManager.restore(); + } catch (InterruptedException e) { + /* + * The container thread is the only external source to trigger an interrupt to the restoration thread and thus + * it is okay to swallow this exception and not propagate it upstream. If the container is interrupted during + * the store restoration, ContainerStorageManager signals the restore workers to abandon restoration and then + * finally propagates the exception upstream to trigger container shutdown. + */ + LOG.warn("Received an interrupt during store restoration for task: {}.", this.taskName.getTaskName()); + } finally { + // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted + // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation). + taskRestoreManager.stopPersistentStores(); + long timeToRestore = System.currentTimeMillis() - startTime; + + if (this.samzaContainerMetrics != null) { + Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(this.taskName, null); + + if (taskGauge != null) { + taskGauge.set(timeToRestore); + } } } + return null; } } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index b41c2454d8..580fd605d8 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -137,6 +137,13 @@ class SystemConsumers ( */ private val emptySystemStreamPartitionsBySystem = new HashMap[String, Set[SystemStreamPartition]]() + /** + * Denotes if the SystemConsumers have started. The flag is useful in the event of shutting down since interrupt + * on Samza Container will shutdown components and container currently doesn't track what components have started + * successfully. + */ + private var started = false + /** * Default timeout to noNewMessagesTimeout. Every time SystemConsumers * receives incoming messages, it sets timeout to 0. Every time @@ -185,15 +192,23 @@ class SystemConsumers ( chooser.start + started = true + refresh } def stop { - debug("Stopping consumers.") + if (started) { + debug("Stopping consumers.") - consumers.values.foreach(_.stop) + consumers.values.foreach(_.stop) - chooser.stop + chooser.stop + + started = false + } else { + debug("Ignoring the consumers stop request since it never started.") + } } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala index e8ce961af0..1744195fab 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemProducers.scala @@ -35,16 +35,29 @@ class SystemProducers( */ dropSerializationError: Boolean = false) extends Logging { + /** + * Denotes if the SystemConsumers have started. The flag is useful in the event of shutting down since interrupt + * on Samza Container will shutdown components and container currently doesn't track what components have started + * successfully. + */ + private var started = false + def start { debug("Starting producers.") producers.values.foreach(_.start) + started = true } def stop { - debug("Stopping producers.") + if (started) { + debug("Stopping producers.") - producers.values.foreach(_.stop) + producers.values.foreach(_.stop) + started = false + } else { + debug("Ignoring the producers stop request since it never started.") + } } def register(source: String) { diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 6d78b77d10..0978738eda 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -55,6 +56,7 @@ import org.powermock.api.mockito.PowerMockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -467,13 +469,67 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { assertEquals(State.STOPPING, streamProcessor.getState()); Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown(); Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); + } - // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP. - streamProcessor.state = State.IN_REBALANCE; + @Test + public void testJobModelExpiredDuringAnExistingRebalance() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); + ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, + Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class)); - streamProcessor.jobCoordinatorListener.onJobModelExpired(); + runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, false); assertEquals(State.IN_REBALANCE, streamProcessor.state); + assertNotEquals(mockExecutorService, streamProcessor.containerExecutorService); + Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); + } + + @Test + public void testJobModelExpiredDuringAnExistingRebalanceWithContainerInterruptFailed() { + JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); + ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); + ExecutorService mockExecutorService = Mockito.mock(ExecutorService.class); + MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); + StreamProcessor streamProcessor = new StreamProcessor("TestProcessorId", config, new HashMap<>(), null, + Optional.empty(), Optional.empty(), Optional.empty(), sp -> lifecycleListener, mockJobCoordinator, Mockito.mock(MetadataStore.class)); + + runJobModelExpireDuringRebalance(streamProcessor, mockExecutorService, true); + + assertEquals(State.STOPPING, streamProcessor.state); + assertEquals(mockExecutorService, streamProcessor.containerExecutorService); + Mockito.verify(mockExecutorService, Mockito.times(1)).shutdownNow(); + Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); + } + + private void runJobModelExpireDuringRebalance(StreamProcessor streamProcessor, ExecutorService executorService, + boolean failContainerInterrupt) { + SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); + CountDownLatch shutdownLatch = new CountDownLatch(1); + + /* + * When there is an initialized container that hasn't started and the stream processor is still in re-balance phase, + * subsequent job model expire request should attempt to interrupt the existing container to safely shut it down + * before proceeding to join the barrier. As part of safe shutdown sequence, we want to ensure shutdownNow is invoked + * on the existing executorService to signal interrupt and make sure new executor service is created. + */ + + Mockito.when(executorService.shutdownNow()).thenAnswer(ctx -> { + if (!failContainerInterrupt) { + shutdownLatch.countDown(); + } + return null; + }); + Mockito.when(executorService.isShutdown()).thenReturn(true); + + streamProcessor.state = State.IN_REBALANCE; + streamProcessor.container = mockSamzaContainer; + streamProcessor.containerExecutorService = executorService; + streamProcessor.containerShutdownLatch = shutdownLatch; + + streamProcessor.jobCoordinatorListener.onJobModelExpired(); } @Test diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 78136bf03c..69223df290 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -35,6 +35,8 @@ import org.junit.Assert._ import org.junit.{Before, Test} import org.mockito.Matchers.{any, notNull} import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.mockito.{Mock, Mockito, MockitoAnnotations} import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar @@ -139,6 +141,46 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { verify(this.runLoop).run() } + @Test + def testInterruptDuringStoreRestorationShutdownContainer(): Unit = { + when(this.containerStorageManager.start()) + .thenAnswer(new Answer[Void] { + override def answer(mock: InvocationOnMock): Void = { + Thread.sleep(1000) + throw new InterruptedException("Injecting interrupt into container storage manager") + } + }) + + this.samzaContainer.run + + assertEquals(SamzaContainerStatus.STOPPED, this.samzaContainer.getStatus()) + verify(this.samzaContainerListener).beforeStart() + verify(this.samzaContainerListener).afterStop() + verify(this.samzaContainerListener, never()).afterFailure(any()) + verify(this.runLoop, times(0)).run() + } + + @Test + def testInterruptDuringStoreRestorationWithErrorsDuringContainerShutdown(): Unit = { + when(this.containerStorageManager.start()) + .thenAnswer(new Answer[Void] { + override def answer(mock: InvocationOnMock): Void = { + Thread.sleep(1000) + throw new InterruptedException("Injecting interrupt into container storage manager") + } + }) + + when(this.taskInstance.shutdownTask).thenThrow(new RuntimeException("Trigger a shutdown, please.")) + + this.samzaContainer.run + + assertEquals(SamzaContainerStatus.FAILED, this.samzaContainer.getStatus()) + verify(this.samzaContainerListener).beforeStart() + verify(this.samzaContainerListener).afterFailure(any()) + verify(this.samzaContainerListener, never()).afterStop() + verify(this.runLoop, times(0)).run() + } + @Test def testFailureDuringShutdown(): Unit = { doNothing().when(this.runLoop).run() // run loop completes successfully diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java index 43872e906e..bdc7e0ee72 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -101,7 +101,7 @@ private void addMockedTask(String taskname, int changelogPartition) { * Method to create a containerStorageManager with mocked dependencies */ @Before - public void setUp() { + public void setUp() throws InterruptedException { taskRestoreMetricGauges = new HashMap<>(); this.tasks = new HashMap<>(); this.taskInstanceMetrics = new HashMap<>(); @@ -243,7 +243,7 @@ public Void answer(InvocationOnMock invocation) { } @Test - public void testParallelismAndMetrics() { + public void testParallelismAndMetrics() throws InterruptedException { this.containerStorageManager.start(); this.containerStorageManager.shutdown(); diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index bc6778eed4..afba824d3a 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -124,7 +124,7 @@ class KeyValueStorageEngine[K, V]( val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize) var lastBatchFlushed = false - while(iterator.hasNext) { + while(iterator.hasNext && !Thread.currentThread().isInterrupted) { val envelope = iterator.next() val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]] val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]] @@ -200,6 +200,11 @@ class KeyValueStorageEngine[K, V]( // flush the store and the changelog producer flush() // TODO HIGH pmaheshw SAMZA-2338: Need a way to flush changelog producers. This only flushes the stores. + + if (Thread.currentThread().isInterrupted) { + warn("Received an interrupt during store restoration. Exiting without restoring the full state.") + throw new InterruptedException("Received an interrupt during store restoration.") + } } def flush() = { diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala index 25445c2912..151fd846a4 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala @@ -21,8 +21,10 @@ package org.apache.samza.storage.kv import java.io.File import java.util.Arrays +import java.util.concurrent.{Callable, ExecutionException, ExecutorService, Executors} import org.apache.samza.Partition +import org.apache.samza.context.Context import org.apache.samza.storage.StoreProperties import org.apache.samza.system.ChangelogSSPIterator.Mode import org.apache.samza.system.{ChangelogSSPIterator, IncomingMessageEnvelope, SystemStreamPartition} @@ -30,6 +32,8 @@ import org.apache.samza.task.MessageCollector import org.junit.Assert._ import org.junit.{After, Before, Test} import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer class TestKeyValueStorageEngine { var engine: KeyValueStorageEngine[String, String] = null @@ -163,6 +167,50 @@ class TestKeyValueStorageEngine { assertEquals(15, metrics.restoredBytesGauge.getValue) // 3 keys * 2 bytes/key + 3 msgs * 3 bytes/msg } + @Test + def testRestoreInterruptedThrowsInterruptException(): Unit = { + val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0)) + val iterator = mock(classOf[ChangelogSSPIterator]) + val executorService = Executors.newSingleThreadExecutor() + val restore = new Callable[Void] { + override def call(): Void = { + engine.restore(iterator) + null + } + } + + when(iterator.hasNext) + .thenReturn(true) + .thenReturn(true) + .thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + executorService.shutdownNow() + true + } + }) + .thenReturn(false) + when(iterator.next()) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), Array[Byte](3, 4, 5))) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), Array[Byte](4, 5, 6))) + .thenReturn(new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), Array[Byte](5, 6, 7))) + when(iterator.getMode) + .thenReturn(Mode.RESTORE) + .thenReturn(Mode.RESTORE) + .thenReturn(Mode.RESTORE) + + try { + val restoreFuture = executorService.submit(restore) + restoreFuture.get() + fail("Expected execution exception during restoration") + } catch { + case e: ExecutionException => { + assertTrue(e.getCause.isInstanceOf[InterruptedException]) + // Make sure we don't restore any more records and bail out + assertEquals(2, metrics.restoredMessagesGauge.getValue) + } + } + } + @Test(expected = classOf[IllegalStateException]) def testThrowsIfIteratorModeChangesFromTrimToRestore(): Unit = { val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0))