diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 791021604d..b1dcf62937 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -20,14 +20,16 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.SamzaException;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
@@ -47,6 +49,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.samza.processor.StreamProcessor.State.*;
+import static org.apache.samza.util.StateTransitionUtil.*;
+
/**
* StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
* independent process.
@@ -65,27 +70,27 @@
* Describes the valid state transitions of the {@link StreamProcessor}.
*
*
- * ────────────────────────────────
- * │ │
- * │ │
- * │ │
- * │ │
- * New StreamProcessor.start() Rebalance triggered V Receives JobModel │
- * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
- * Creation │ │ by group leader │ and starts Container │
- * │ │ │ │
- * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop()
- * │ │ │ │
- * │ │ │ │
- * │ │ │ │
- * V V V V
- * ───────────────────────────▶ STOPPING D──────────────────────────────────────────────────────────
- * │
- * │
- * After JobCoordinator and SamzaContainer had shutdown.
- * │
- * V
- * STOPPED
+ * ───────────────────────────────
+ * │ │
+ * │ │
+ * │ │
+ * │ │
+ * New StreamProcessor.start() Rebalance triggered V Receives JobModel │
+ * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ─────────────────–––––––––––––––––––––─▶ START_REBALANCE ––––––––––––––––▶ IN_REBALANCE ─────────────────────▶ RUNNING
+ * Creation │ │ by group leader and starts Container │ │
+ * │ │ │ │
+ * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop()
+ * │ │ │ │
+ * │ │ │ │
+ * │ │ │ │
+ * V V V V
+ * ───────────–––––––––––––––––––––––––––––––––––––––––––––──────────────––––––––──▶ STOPPING ──────────────────────────────────────────────────────────–––
+ * │
+ * │
+ * After JobCoordinator and SamzaContainer had shutdown.
+ * │
+ * V
+ * STOPPED
*
*
*/
@@ -102,17 +107,26 @@ public class StreamProcessor {
private final long taskShutdownMs;
private final String processorId;
private final ExecutorService executorService;
- private final Object lock = new Object();
private Throwable containerException = null;
- volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);
+ @VisibleForTesting
+ AtomicReference state = new AtomicReference<>(NEW);
+
+ @VisibleForTesting
+ SamzaContainer container = null;
+
+ @VisibleForTesting
+ JobCoordinatorListener jobCoordinatorListener = null;
+
+ volatile CountDownLatch containerShutdownLatch = new CountDownLatch(0);
/**
* Indicates the current status of a {@link StreamProcessor}.
*/
public enum State {
- STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), IN_REBALANCE("IN_REBALANCE");
+ STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"),
+ START_REBALANCE("START_REBALANCE"), IN_REBALANCE("IN_REBALANCE");
private String strVal;
@@ -130,18 +144,9 @@ public String toString() {
* @return the current state of StreamProcessor.
*/
public State getState() {
- return state;
+ return state.get();
}
- @VisibleForTesting
- State state = State.NEW;
-
- @VisibleForTesting
- SamzaContainer container = null;
-
- @VisibleForTesting
- JobCoordinatorListener jobCoordinatorListener = null;
-
/**
* StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}.
*
@@ -213,14 +218,17 @@ public StreamProcessor(Config config, Map customMetrics
*
*/
public void start() {
- synchronized (lock) {
- if (state == State.NEW) {
- processorListener.beforeStart();
- state = State.STARTED;
- jobCoordinator.start();
- } else {
- LOGGER.info("Start is no-op, since the current state is {} and not {}.", state, State.NEW);
+ if (state.get() == NEW) {
+ processorListener.beforeStart();
+ jobCoordinator.start();
+
+ if (!state.compareAndSet(NEW, STARTED)) {
+ LOGGER.info("Failed to transition to STARTED since the current state is {} and not {}", state.get(), NEW);
+ // todo: Should stop be invoked or do we throw exception?
+ // todo: We ideally want to call stop to make sure resources spun by jobcoordinator.start() are cleaned up
}
+ } else {
+ LOGGER.info("Stream processor has already been initialized and the current state {}", state.get());
}
}
@@ -250,24 +258,14 @@ public void start() {
*
*/
public void stop() {
- synchronized (lock) {
- if (state != State.STOPPING && state != State.STOPPED) {
- state = State.STOPPING;
- try {
- LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
- boolean hasContainerShutdown = stopSamzaContainer();
- if (!hasContainerShutdown) {
- LOGGER.info("Interrupting the container: {} thread to die.", container);
- executorService.shutdownNow();
- }
- } catch (Throwable throwable) {
- LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable);
- }
- LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId);
- jobCoordinator.stop();
- } else {
- LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", state);
- }
+ if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) {
+ LOGGER.info("Shutting down stream processor: {}", processorId);
+ stopSamzaContainer();
+
+ LOGGER.info("Shutting down job coordinator for the stream processor: {}", processorId);
+ jobCoordinator.stop();
+ } else if (state.get() == STOPPING || state.get() == STOPPED) {
+ LOGGER.info("Shutdown in progress for stream processor: {} with state {}", processorId, state.get());
}
}
@@ -293,28 +291,20 @@ private JobCoordinator createJobCoordinator() {
/**
* Stops the {@link SamzaContainer}.
- * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise.
*/
- private boolean stopSamzaContainer() {
- boolean hasContainerShutdown = true;
+ private void stopSamzaContainer() {
+ LOGGER.info("Shutting down the container: {} for the stream processor: {}", container, processorId);
if (container != null) {
if (!container.hasStopped()) {
try {
container.shutdown();
- LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container);
- hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
} catch (IllegalContainerStateException icse) {
LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
- } catch (Exception e) {
- LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
- hasContainerShutdown = false;
}
- LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown));
} else {
- LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
+ LOGGER.info("Container is not instantiated for the stream processor: {}", processorId);
}
}
- return hasContainerShutdown;
}
private JobCoordinatorListener createJobCoordinatorListener() {
@@ -322,63 +312,95 @@ private JobCoordinatorListener createJobCoordinatorListener() {
@Override
public void onJobModelExpired() {
- synchronized (lock) {
- if (state == State.STARTED || state == State.RUNNING) {
- state = State.IN_REBALANCE;
- LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
- boolean hasContainerShutdown = stopSamzaContainer();
- if (!hasContainerShutdown) {
- LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
- state = State.STOPPING;
- jobCoordinator.stop();
- } else {
- LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
- }
+ if (compareAndSet(state, ImmutableSet.of(STARTED, RUNNING), START_REBALANCE)) {
+ LOGGER.info("Job model expired for the stream processor: {}", processorId);
+ stopSamzaContainer();
+
+ LOGGER.info("Entering re-balance phase with a barrier on container shutdown for the stream processor: {}", processorId);
+ boolean inRebalance = compareAndSetWithBarrier(state, START_REBALANCE
+ , IN_REBALANCE, containerShutdownLatch, Duration.ofMillis(taskShutdownMs));
+
+ // failed to transition to IN_REBALANCE either container shutdown failed or barrier timed out
+ if (!inRebalance) {
+ LOGGER.warn("Failed to transition to re-balance phase."
+ + " Stopping the stream processor: {} due to unclean container shutdown.", processorId);
+ jobCoordinator.stop();
} else {
- LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED));
+ LOGGER.info("Successfully transitioned to re-balance phase for the stream processor: {}", processorId);
}
+ } else {
+ LOGGER.info("Ignoring onJobModelExpired since the current state is {} and not in {}.",
+ state.get(), ImmutableSet.of(RUNNING, STARTED));
}
}
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
- synchronized (lock) {
- if (state == State.IN_REBALANCE) {
- containerShutdownLatch = new CountDownLatch(1);
- container = createSamzaContainer(processorId, jobModel);
- container.setContainerListener(new ContainerListener());
- LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
- executorService.submit(container);
- } else {
- LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
- }
+ if (state.get() == IN_REBALANCE) {
+ LOGGER.info("New job model received for the stream processor: {}", processorId);
+ containerShutdownLatch = new CountDownLatch(1);
+ container = createSamzaContainer(processorId, jobModel);
+ container.setContainerListener(new ContainerListener());
+ LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+ executorService.submit(container);
+ } else {
+ LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.",
+ state.get(), IN_REBALANCE);
}
}
@Override
public void onCoordinatorStop() {
- synchronized (lock) {
- LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
+ LOGGER.info("Received shutdown request from job coordinator. Shutting down the stream processor: {}", processorId);
+
+ // Stop samza container only when stream processor state is not in STOPPING or STOPPED
+ if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) {
stopSamzaContainer();
- executorService.shutdownNow();
- state = State.STOPPED;
}
- if (containerException != null)
- processorListener.afterFailure(containerException);
- else
- processorListener.afterStop();
+ if (state.get() == STOPPING) {
+ LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId);
+ boolean stopped = compareAndSetWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs));
+
+ if (containerException != null) {
+ processorListener.afterFailure(containerException);
+ } else if (stopped) {
+ processorListener.afterStop();
+ } else {
+ executorService.shutdownNow();
+ processorListener.afterFailure(new SamzaException("Samza container did not shutdown cleanly."));
+ }
+
+ LOGGER.info("Shutdown for the stream processor: {} completed with status={}"
+ , processorId, containerException == null && stopped);
+ } else {
+ LOGGER.info("Ignoring shutdown request since the current state is {} and not {} for the stream processor: {}"
+ , new Object[] {state.get(), STOPPING, processorId});
+ }
}
@Override
public void onCoordinatorFailure(Throwable throwable) {
- synchronized (lock) {
- LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
+ LOGGER.info("Received shutdown request for the stream processor: {} from job coordinator due to {}"
+ , processorId, throwable);
+
+ if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) {
stopSamzaContainer();
- executorService.shutdownNow();
- state = State.STOPPED;
}
- processorListener.afterFailure(throwable);
+
+ if (state.get() == STOPPING) {
+ LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId);
+ if (!compareAndSetWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs))) {
+ executorService.shutdownNow();
+ }
+
+ // todo: Should we prioritize which exception to throw to the processor listener?
+ processorListener.afterFailure(throwable);
+ LOGGER.info("Shutdown for the stream processor: {} completed with status=false");
+ } else {
+ LOGGER.info("Ignoring shutdown request since the current state is {} and not {} for the stream processor: {}"
+ , new Object[] {state.get(), STOPPING, processorId});
+ }
}
};
}
@@ -402,35 +424,39 @@ public void beforeStart() {
@Override
public void afterStart() {
- LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId);
- if (!processorOnStartCalled) {
- processorListener.afterStart();
- processorOnStartCalled = true;
+ LOGGER.info("Received container start notification from container: {} for stream processor: {}",
+ container, processorId);
+
+ if (compareAndSet(state, ImmutableSet.of(STARTED, IN_REBALANCE), RUNNING)) {
+ if (!processorOnStartCalled) {
+ processorListener.afterStart();
+ processorOnStartCalled = true;
+ }
+ LOGGER.info("Stream processor started!");
+ } else {
+ LOGGER.info("Invalid state transition from {} to {}", state.get(), RUNNING);
}
- state = State.RUNNING;
}
@Override
public void afterStop() {
+ LOGGER.info("Received stop notification from container: {} for stream processor: {}", container, processorId);
containerShutdownLatch.countDown();
- synchronized (lock) {
- if (state == State.IN_REBALANCE) {
- LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
- } else {
- LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
- state = State.STOPPING;
- jobCoordinator.stop();
- }
+
+ if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED, START_REBALANCE, IN_REBALANCE), STOPPING)) {
+ LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
+ jobCoordinator.stop();
}
}
@Override
public void afterFailure(Throwable t) {
+ LOGGER.info("Received failure notification from container: {} for stream processor: {}", container, processorId);
+ containerException = t;
containerShutdownLatch.countDown();
- synchronized (lock) {
- LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t);
- state = State.STOPPING;
- containerException = t;
+
+ if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) {
+ LOGGER.error("Stopping the stream processor: {} due to container exception {}", processorId, t);
jobCoordinator.stop();
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java
new file mode 100644
index 0000000000..f90e06e321
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A utility class to perform complex state transitions.
+ */
+public class StateTransitionUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(StateTransitionUtil.class);
+
+ /**
+ * Atomically sets the value to the given updated value if current value is one of the expected values.
+ *
+ * @param reference the atomic reference
+ * @param expectedValues set of expected values
+ * @param update the new value
+ * @param type of the atomic reference
+ *
+ * @return true if current state is one of the expected value and transition was successful; false otherwise
+ */
+ public static boolean compareAndSet(AtomicReference reference, Collection expectedValues, T update) {
+ while (true) {
+ T currentValue = reference.get();
+ if (expectedValues.contains(currentValue)) {
+ if (reference.compareAndSet(currentValue, update)) {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Atomically sets the value to the given updated value if the black listed values does not contain current value.
+ *
+ * @param reference the atomic reference
+ * @param blacklistedValues set of blacklisted values
+ * @param update the new value
+ * @param type of the atomic reference
+ *
+ * @return true if current state is not in the blacklisted values and transition was successful; false otherwise
+ */
+ public static boolean compareNotInAndSet(AtomicReference reference, Collection blacklistedValues, T update) {
+ while (true) {
+ T currentValue = reference.get();
+ if (blacklistedValues.contains(currentValue)) {
+ return false;
+ }
+
+ if (reference.compareAndSet(currentValue, update)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Atomically sets the value to the updated value if the current value == expected value and the barrier
+ * latch counts down to zero. If the barrier times out determined by the timeout parameter, the atomic reference
+ * is not updated.
+ *
+ * @param reference the atomic reference
+ * @param expect the expected value
+ * @param update the new value
+ * @param barrier the barrier latch
+ * @param timeout the timeout for barrier
+ * @param type of the atomic reference
+ *
+ * @return true if current state == expected value and the barrier completed and transition was successful; false otherwise
+ */
+ public static boolean compareAndSetWithBarrier(AtomicReference reference, T expect, T update,
+ CountDownLatch barrier, Duration timeout) {
+ if (reference.get() != expect) {
+ LOG.error("Failed to transition from {} to {}", expect, update);
+ throw new IllegalStateException("Cannot transition to " + update + " from state " + expect
+ + " since the current state is " + reference.get());
+ }
+
+ try {
+ if (timeout.isNegative()) {
+ barrier.await();
+ } else {
+ boolean completed = barrier.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ if (!completed) {
+ LOG.error("Failed to transition from {} to {} due to barrier timeout.", expect, update);
+ return false;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Failed to transition from {} to {} due to {}", new Object[] {expect, update, e});
+ return false;
+ }
+
+ return reference.compareAndSet(expect, update);
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 673015aa56..8846c68fcb 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -49,7 +49,7 @@
import org.powermock.api.mockito.PowerMockito;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -363,14 +363,18 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() {
* state and should shutdown JobCoordinator.
*/
Mockito.doNothing().when(mockJobCoordinator).start();
- Mockito.doNothing().when(mockJobCoordinator).stop();
+ Mockito.doAnswer(ans -> {
+ streamProcessor.state.set(State.STOPPING);
+ return null;
+ }).when(mockJobCoordinator).stop();
Mockito.doNothing().when(mockSamzaContainer).shutdown();
Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
Mockito.when(mockSamzaContainer.getStatus())
.thenReturn(SamzaContainerStatus.STARTED)
.thenReturn(SamzaContainerStatus.STOPPED);
streamProcessor.container = mockSamzaContainer;
- streamProcessor.state = State.STARTED;
+ streamProcessor.containerShutdownLatch = new CountDownLatch(1);
+ streamProcessor.state.set(State.STARTED);
streamProcessor.jobCoordinatorListener.onJobModelExpired();
@@ -379,27 +383,28 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() {
Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
// If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP.
- streamProcessor.state = State.IN_REBALANCE;
+ streamProcessor.state.set(State.IN_REBALANCE);
streamProcessor.jobCoordinatorListener.onJobModelExpired();
- assertEquals(State.IN_REBALANCE, streamProcessor.state);
+ assertEquals(State.IN_REBALANCE, streamProcessor.getState());
}
@Test
- public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception {
+ public void testOnNewJobModelShouldResultInValidStateTransitions() {
JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class);
SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
- StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+ StreamProcessor streamProcessor = new TestableStreamProcessor(config, new HashMap<>(), null,
+ lifecycleListener, mockJobCoordinator, mockSamzaContainer);
- streamProcessor.container = mockSamzaContainer;
- streamProcessor.state = State.IN_REBALANCE;
+ streamProcessor.state.set(State.IN_REBALANCE);
Mockito.doNothing().when(mockSamzaContainer).run();
streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>()));
+ Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any());
Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run();
}
@@ -418,11 +423,11 @@ public void testStopShouldBeIdempotent() {
.thenReturn(SamzaContainerStatus.STARTED)
.thenReturn(SamzaContainerStatus.STOPPED);
- streamProcessor.state = State.RUNNING;
+ streamProcessor.state.set(State.RUNNING);
streamProcessor.stop();
- assertEquals(State.STOPPING, streamProcessor.state);
+ assertEquals(State.STOPPING, streamProcessor.getState());
}
@Test
@@ -436,13 +441,13 @@ public void testCoordinatorFailureShouldStopTheStreamProcessor() {
Exception failureException = new Exception("dummy exception");
streamProcessor.container = mockSamzaContainer;
- streamProcessor.state = State.RUNNING;
+ streamProcessor.state.set(State.RUNNING);
streamProcessor.jobCoordinatorListener.onCoordinatorFailure(failureException);
Mockito.doNothing().when(mockSamzaContainer).shutdown();
Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
- assertEquals(State.STOPPED, streamProcessor.state);
+ assertEquals(State.STOPPED, streamProcessor.state.get());
Mockito.verify(lifecycleListener).afterFailure(failureException);
Mockito.verify(mockSamzaContainer).shutdown();
}
@@ -454,10 +459,10 @@ public void testCoordinatorStopShouldStopTheStreamProcessor() {
MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
- streamProcessor.state = State.RUNNING;
+ streamProcessor.state.set(State.RUNNING);
streamProcessor.jobCoordinatorListener.onCoordinatorStop();
- assertEquals(State.STOPPED, streamProcessor.state);
+ assertEquals(State.STOPPED, streamProcessor.getState());
Mockito.verify(lifecycleListener).afterStop();
}
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java
new file mode 100644
index 0000000000..263d99302c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test class for {@link StateTransitionUtil}
+ */
+public class TestStateTransitionUtil {
+ private static final List STATES = ImmutableList.of("A", "B", "C", "D", "E");
+ private static final Map> VALID_TRANSITIONS = new ImmutableMap.Builder>()
+ .put("A", ImmutableList.of("C", "D"))
+ .put("B", ImmutableList.of("D", "E"))
+ .put("C", ImmutableList.of("D", "A", "E"))
+ .put("D", ImmutableList.of("A", "B"))
+ .put("E", ImmutableList.of("B", "C"))
+ .build();
+
+ private static final Map> INVALID_TRANSITIONS = new ImmutableMap.Builder>()
+ .put("A", ImmutableList.of("A", "B", "E"))
+ .put("B", ImmutableList.of("A", "B", "C"))
+ .put("C", ImmutableList.of("B", "C"))
+ .put("D", ImmutableList.of("C", "D", "E"))
+ .put("E", ImmutableList.of("A", "D", "E"))
+ .build();
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testCompareAndSet() {
+ AtomicReference reference = new AtomicReference<>();
+
+ for (String desiredState : STATES) {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ reference.set(currentState);
+
+ boolean actual = StateTransitionUtil.compareAndSet(reference, VALID_TRANSITIONS.get(desiredState), desiredState);
+ boolean expected = VALID_TRANSITIONS.get(desiredState).contains(currentState);
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testCompareNotInAndSet() {
+ AtomicReference reference = new AtomicReference<>();
+
+ for (String desiredState : STATES) {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ reference.set(currentState);
+
+ boolean actual = StateTransitionUtil.compareNotInAndSet(reference, INVALID_TRANSITIONS.get(desiredState), desiredState);
+ boolean expected = !INVALID_TRANSITIONS.get(desiredState).contains(currentState);
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testCompareAndSetWithBarrierTimeout() {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ String desiredState = STATES.get(RANDOM.nextInt(STATES.size()));
+ AtomicReference reference = new AtomicReference<>(currentState);
+
+ final CountDownLatch barrier = new CountDownLatch(1);
+ Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000));
+ boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, timeout);
+
+ assertFalse(transitionResult);
+ }
+
+ @Test
+ public void testCompareAndSetWithBarrierAlreadyCompleted() {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ String desiredState = STATES.get(RANDOM.nextInt(STATES.size()));
+ AtomicReference reference = new AtomicReference<>(currentState);
+
+ final CountDownLatch barrier = new CountDownLatch(0);
+ Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000));
+ boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, timeout);
+
+ assertTrue(transitionResult);
+ }
+
+ @Test
+ public void testCompareAndSetWithBarrierCompletionWithStateChange() {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ String desiredState = STATES.get(RANDOM.nextInt(STATES.size()));
+ AtomicReference reference = new AtomicReference<>(currentState);
+
+ final CountDownLatch barrier = new CountDownLatch(1);
+ Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000));
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(timeout.toMillis());
+ } catch (InterruptedException e) {
+
+ }
+
+ for (String state : STATES) {
+ if (!currentState.equals(state)) {
+ reference.set(state);
+ break;
+ }
+ }
+ barrier.countDown();
+ });
+
+ boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, Duration.ofMillis(-1));
+ assertFalse(transitionResult);
+ }
+
+ @Test
+ public void testCompareAndSetWithBarrierCompletion() {
+ String currentState = STATES.get(RANDOM.nextInt(STATES.size()));
+ String desiredState = STATES.get(RANDOM.nextInt(STATES.size()));
+ AtomicReference reference = new AtomicReference<>(currentState);
+
+ final CountDownLatch barrier = new CountDownLatch(1);
+ Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000));
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(timeout.toMillis());
+ } catch (InterruptedException e) {
+
+ }
+
+ barrier.countDown();
+ });
+
+ boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, Duration.ofMillis(-1));
+ assertTrue(transitionResult);
+ }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
new file mode 100644
index 0000000000..9210db3791
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.framework;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.kafka.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.test.operator.data.PageView;
+import org.junit.Test;
+
+import static org.apache.samza.test.framework.TestTimerApp.*;
+import static org.junit.Assert.*;
+
+
+public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness {
+ @Test
+ public void testRaceCondition() throws InterruptedException {
+ Map configs = new HashMap<>();
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+ configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
+ configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
+ configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+ configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views");
+ configs.put("task.shutdown.ms", "1000");
+ configs.put(JobConfig.PROCESSOR_ID(), "0");
+
+ createTopic(PAGE_VIEWS, 2);
+
+ // create events for the following user activity.
+ // userId: (viewId, pageId, (adIds))
+ // u1: (v1, p1, (a1)), (v2, p2, (a3))
+ // u2: (v3, p1, (a1)), (v4, p3, (a5))
+ produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}");
+ produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}");
+
+ RunApplicationContext context =
+ runApplication(new FaultInjectionStreamApp(), "fault-injection-app", configs);
+ context.getRunner().kill();
+ context.getRunner().waitForFinish();
+ assertEquals(context.getRunner().status(), ApplicationStatus.UnsuccessfulFinish);
+ }
+
+ private static class FaultInjectionStreamApp implements StreamApplication {
+ public static final String SYSTEM = "kafka";
+ public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName";
+
+ @Override
+ public void describe(StreamApplicationDescriptor appDesc) {
+ Config config = appDesc.getConfig();
+ String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
+
+ final JsonSerdeV2 serde = new JsonSerdeV2<>(PageView.class);
+ KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
+ KafkaInputDescriptor isd = ksd.getInputDescriptor(inputTopic, serde);
+ final MessageStream broadcastPageViews = appDesc
+ .getInputStream(isd)
+ .map(message -> {
+ throw new RuntimeException("failed");
+ });
+ }
+ }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index b249d4d287..b84886370b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -617,6 +617,9 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine
// Trigger re-balancing phase, by manually adding a new processor.
configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+
+ // Reset the task shutdown ms for 3rd application to give it ample time to shutdown cleanly
+ configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS);
Config applicationConfig3 = new MapConfig(configMap);
CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
@@ -633,9 +636,8 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine
/**
* If the processing has started in the third stream processor, then other two stream processors should be stopped.
*/
- // TODO: This is a bug! Status should be unsuccessful finish.
- assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
- assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status());
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status());
appRunner3.kill();
appRunner3.waitForFinish();