From 8af1d28a28672aa07504564172acf65d5e5df05f Mon Sep 17 00:00:00 2001 From: bharathkk Date: Mon, 24 Sep 2018 16:39:42 -0700 Subject: [PATCH 1/7] Fix race condition between StreamProcessor and SamzaContainerListener --- .../samza/processor/StreamProcessor.java | 196 +++++++++--------- .../samza/util/StateTransitionUtil.java | 101 +++++++++ .../samza/processor/TestStreamProcessor.java | 23 +- .../test/framework/FaultInjectionTest.java | 92 ++++++++ 4 files changed, 308 insertions(+), 104 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java create mode 100644 samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 791021604d..61664a96ca 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -20,14 +20,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.samza.SamzaException; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.Config; import org.apache.samza.config.JobCoordinatorConfig; @@ -47,6 +49,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.samza.processor.StreamProcessor.State.*; +import static org.apache.samza.util.StateTransitionUtil.*; + /** * StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an * independent process. @@ -79,7 +84,7 @@ * │ │ │ │ * │ │ │ │ * V V V V - * ───────────────────────────▶ STOPPING D────────────────────────────────────────────────────────── + * ───────────────────────────▶ STOPPING ────────────────────────────────────────────────────────── * │ * │ * After JobCoordinator and SamzaContainer had shutdown. @@ -102,17 +107,17 @@ public class StreamProcessor { private final long taskShutdownMs; private final String processorId; private final ExecutorService executorService; - private final Object lock = new Object(); private Throwable containerException = null; - volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1); + volatile CountDownLatch containerShutdownLatch = new CountDownLatch(0); /** * Indicates the current status of a {@link StreamProcessor}. */ public enum State { - STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), IN_REBALANCE("IN_REBALANCE"); + STARTING("STARTING"), STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), + START_REBALANCE("START_REBALANCE"), IN_REBALANCE("IN_REBALANCE"); private String strVal; @@ -130,11 +135,11 @@ public String toString() { * @return the current state of StreamProcessor. */ public State getState() { - return state; + return state.get(); } @VisibleForTesting - State state = State.NEW; + AtomicReference state = new AtomicReference<>(NEW); @VisibleForTesting SamzaContainer container = null; @@ -213,14 +218,16 @@ public StreamProcessor(Config config, Map customMetrics *

*/ public void start() { - synchronized (lock) { - if (state == State.NEW) { - processorListener.beforeStart(); - state = State.STARTED; - jobCoordinator.start(); - } else { - LOGGER.info("Start is no-op, since the current state is {} and not {}.", state, State.NEW); - } + if (state.compareAndSet(NEW, STARTING)) { + processorListener.beforeStart(); + } else { + LOGGER.info("Stream processor has already been initialized and the current state {}", state.get()); + } + + if (state.compareAndSet(STARTING, STARTED)) { + jobCoordinator.start(); + } else { + LOGGER.info("Stream processor has already started and the current state {}", state.get()); } } @@ -250,24 +257,15 @@ public void start() { * */ public void stop() { - synchronized (lock) { - if (state != State.STOPPING && state != State.STOPPED) { - state = State.STOPPING; - try { - LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); - boolean hasContainerShutdown = stopSamzaContainer(); - if (!hasContainerShutdown) { - LOGGER.info("Interrupting the container: {} thread to die.", container); - executorService.shutdownNow(); - } - } catch (Throwable throwable) { - LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable); - } - LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId); - jobCoordinator.stop(); - } else { - LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", state); - } + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { + LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); + stopSamzaContainer(); + + LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId); + jobCoordinator.stop(); + } else if (state.get() == STOPPING || state.get() == STOPPED) { + // do we need to wait here or can we move on + LOGGER.info("Stream processor is shutting down with state {}", state.get()); } } @@ -293,28 +291,19 @@ private JobCoordinator createJobCoordinator() { /** * Stops the {@link SamzaContainer}. - * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise. */ - private boolean stopSamzaContainer() { - boolean hasContainerShutdown = true; + private void stopSamzaContainer() { if (container != null) { if (!container.hasStopped()) { try { container.shutdown(); - LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container); - hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS); } catch (IllegalContainerStateException icse) { LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); - } catch (Exception e) { - LOGGER.error("Exception occurred when shutting down the container: {}.", container, e); - hasContainerShutdown = false; } - LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown)); } else { LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); } } - return hasContainerShutdown; } private JobCoordinatorListener createJobCoordinatorListener() { @@ -322,63 +311,77 @@ private JobCoordinatorListener createJobCoordinatorListener() { @Override public void onJobModelExpired() { - synchronized (lock) { - if (state == State.STARTED || state == State.RUNNING) { - state = State.IN_REBALANCE; - LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); - boolean hasContainerShutdown = stopSamzaContainer(); - if (!hasContainerShutdown) { - LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId); - state = State.STOPPING; - jobCoordinator.stop(); - } else { - LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId); - } - } else { - LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED)); + if (compareAndSet(state, ImmutableSet.of(STARTED, RUNNING), START_REBALANCE)) { + LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); + + // attempt to stop the container + stopSamzaContainer(); + + // transition to IN_REBALANCE with container shutdown latch as the barrier + boolean inRebalance = transitionWithBarrier(state, START_REBALANCE + , IN_REBALANCE, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); + + // failed to transition to IN_REBALANCE either container shutdown failed or barrier timed out + if (!inRebalance) { + LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", + container, processorId); + jobCoordinator.stop(); } + } else { + LOGGER.info("Ignoring onJobModelExpired since the current state is {} and not in {}.", + state.get(), ImmutableSet.of(RUNNING, STARTED)); } } @Override public void onNewJobModel(String processorId, JobModel jobModel) { - synchronized (lock) { - if (state == State.IN_REBALANCE) { - containerShutdownLatch = new CountDownLatch(1); - container = createSamzaContainer(processorId, jobModel); - container.setContainerListener(new ContainerListener()); - LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); - executorService.submit(container); - } else { - LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE); - } + if (state.get() == IN_REBALANCE) { + containerShutdownLatch = new CountDownLatch(1); + container = createSamzaContainer(processorId, jobModel); + container.setContainerListener(new ContainerListener()); + LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId); + executorService.submit(container); + } else { + LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", + state.get(), State.IN_REBALANCE); } } @Override public void onCoordinatorStop() { - synchronized (lock) { + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); stopSamzaContainer(); - executorService.shutdownNow(); - state = State.STOPPED; } - if (containerException != null) + + boolean stopped = transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, + Duration.ofMillis(taskShutdownMs)); + + if (containerException != null) { processorListener.afterFailure(containerException); - else + } else if (stopped) { processorListener.afterStop(); - + } else { + executorService.shutdownNow(); + processorListener.afterFailure(new SamzaException("Samza container did not shutdown cleanly.")); + } } @Override public void onCoordinatorFailure(Throwable throwable) { - synchronized (lock) { - LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable); + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { + LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s." + + " Original exception:", jobCoordinator, processorId), throwable); stopSamzaContainer(); - executorService.shutdownNow(); - state = State.STOPPED; + + if (!transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch + , Duration.ofMillis(taskShutdownMs))) { + executorService.shutdownNow(); + LOGGER.warn("Failed to transition from {} to {}.", STOPPING, STOPPED); + } + + processorListener.afterFailure(throwable); } - processorListener.afterFailure(throwable); } }; } @@ -402,36 +405,43 @@ public void beforeStart() { @Override public void afterStart() { - LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId); + LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", + container, processorId); if (!processorOnStartCalled) { processorListener.afterStart(); processorOnStartCalled = true; } - state = State.RUNNING; + + if (compareAndSet(state, ImmutableSet.of(STARTED, IN_REBALANCE), RUNNING)) { + LOGGER.info("Stream processor started!"); + } else { + LOGGER.info("Invalid state transition from {} to {}", state.get(), RUNNING); + } } @Override public void afterStop() { containerShutdownLatch.countDown(); - synchronized (lock) { - if (state == State.IN_REBALANCE) { - LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId); - } else { - LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); - state = State.STOPPING; - jobCoordinator.stop(); - } + + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED, START_REBALANCE), STOPPING)) { + LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); + jobCoordinator.stop(); + } else { + LOGGER.info("Container: {} stopped.", container); } } @Override public void afterFailure(Throwable t) { + containerException = t; containerShutdownLatch.countDown(); - synchronized (lock) { - LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), t); - state = State.STOPPING; - containerException = t; + + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { + LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s." + + " Original exception:", container, processorId), t); jobCoordinator.stop(); + } else { + LOGGER.error("Container: %s failed with an exception {}.", container, t); } } } diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java new file mode 100644 index 0000000000..c342ff4d32 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StateTransitionUtil { + private static final Logger LOG = LoggerFactory.getLogger(StateTransitionUtil.class); + + /** + * + * @param reference + * @param expectedValues + * @param value + * @param + * @return + */ + public static boolean compareAndSet(AtomicReference reference, Set expectedValues, T value) { + T currentValue = reference.get(); + if (expectedValues.contains(currentValue)) { + return reference.compareAndSet(currentValue, value); + } + + return false; + } + + /** + * + * @param reference + * @param blacklistedValues + * @param value + * @param + * @return + */ + public static boolean compareNotInAndSet(AtomicReference reference, Set blacklistedValues, T value) { + T currentValue = reference.get(); + if (blacklistedValues.contains(currentValue)) { + return false; + } + + return reference.compareAndSet(currentValue, value); + } + + /** + * + * @param reference + * @param currentState + * @param desiredState + * @param barrier + * @param timeout + * @param + * @return + */ + public static boolean transitionWithBarrier(AtomicReference reference, T currentState, T desiredState, + CountDownLatch barrier, Duration timeout) { + if (reference.get() != currentState) { + LOG.error("Failed to transition from {} to {}", currentState, desiredState); + throw new IllegalStateException("Cannot transition to " + desiredState + " from state " + currentState + + " since the current state is " + reference.get()); + } + + try { + if (timeout == Duration.ZERO) { + barrier.await(); + } else { + boolean completed = barrier.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (!completed) { + LOG.error("Failed to transition from {} to {} due to barrier timeout.", currentState, desiredState); + } + } + } catch (InterruptedException e) { + LOG.error("Failed to transition from {} to {} due to {}", new Object[] {currentState, desiredState, e}); + return false; + } + + return reference.compareAndSet(currentState, desiredState); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 673015aa56..786aecfca7 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -370,7 +370,8 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { .thenReturn(SamzaContainerStatus.STARTED) .thenReturn(SamzaContainerStatus.STOPPED); streamProcessor.container = mockSamzaContainer; - streamProcessor.state = State.STARTED; + streamProcessor.containerShutdownLatch = new CountDownLatch(1); + streamProcessor.state.set(State.STARTED); streamProcessor.jobCoordinatorListener.onJobModelExpired(); @@ -379,15 +380,15 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop(); // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP. - streamProcessor.state = State.IN_REBALANCE; + streamProcessor.state.set(State.IN_REBALANCE); streamProcessor.jobCoordinatorListener.onJobModelExpired(); - assertEquals(State.IN_REBALANCE, streamProcessor.state); + assertEquals(State.IN_REBALANCE, streamProcessor.getState()); } @Test - public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception { + public void testOnNewJobModelShouldResultInValidStateTransitions() { JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class); ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); @@ -395,7 +396,7 @@ public void testOnNewJobModelShouldResultInValidStateTransitions() throws Except StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); streamProcessor.container = mockSamzaContainer; - streamProcessor.state = State.IN_REBALANCE; + streamProcessor.state.set(State.IN_REBALANCE); Mockito.doNothing().when(mockSamzaContainer).run(); streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); @@ -418,11 +419,11 @@ public void testStopShouldBeIdempotent() { .thenReturn(SamzaContainerStatus.STARTED) .thenReturn(SamzaContainerStatus.STOPPED); - streamProcessor.state = State.RUNNING; + streamProcessor.state.set(State.RUNNING); streamProcessor.stop(); - assertEquals(State.STOPPING, streamProcessor.state); + assertEquals(State.STOPPING, streamProcessor.getState()); } @Test @@ -436,13 +437,13 @@ public void testCoordinatorFailureShouldStopTheStreamProcessor() { Exception failureException = new Exception("dummy exception"); streamProcessor.container = mockSamzaContainer; - streamProcessor.state = State.RUNNING; + streamProcessor.state.set(State.RUNNING); streamProcessor.jobCoordinatorListener.onCoordinatorFailure(failureException); Mockito.doNothing().when(mockSamzaContainer).shutdown(); Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); - assertEquals(State.STOPPED, streamProcessor.state); + assertEquals(State.STOPPED, streamProcessor.state.get()); Mockito.verify(lifecycleListener).afterFailure(failureException); Mockito.verify(mockSamzaContainer).shutdown(); } @@ -454,10 +455,10 @@ public void testCoordinatorStopShouldStopTheStreamProcessor() { MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator); - streamProcessor.state = State.RUNNING; + streamProcessor.state.set(State.RUNNING); streamProcessor.jobCoordinatorListener.onCoordinatorStop(); - assertEquals(State.STOPPED, streamProcessor.state); + assertEquals(State.STOPPED, streamProcessor.getState()); Mockito.verify(lifecycleListener).afterStop(); } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java new file mode 100644 index 0000000000..4bc698e060 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.framework; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.application.StreamApplicationDescriptor; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; +import org.apache.samza.test.operator.data.PageView; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.samza.test.framework.TestTimerApp.*; + + +public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness { + + @Before + public void setup() { + // create topics + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + } + + @Test + public void testRaceCondition() throws InterruptedException { + Map configs = new HashMap<>(); + configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory"); + configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory"); + configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); + configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); + configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views"); + configs.put("task.shutdown.ms", "10000"); + configs.put(JobConfig.PROCESSOR_ID(), "0"); + + RunApplicationContext context = + runApplication(new FaultInjectionStreamApp(), "fault-injection-app", configs); + Thread.sleep(1000); + context.getRunner().kill(); + context.getRunner().waitForFinish(); + System.out.println("Application status: " + context.getRunner().status()); + } + + private static class FaultInjectionStreamApp implements StreamApplication { + public static final String SYSTEM = "kafka"; + public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; + + @Override + public void describe(StreamApplicationDescriptor appDesc) { + Config config = appDesc.getConfig(); + String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); + + final JsonSerdeV2 serde = new JsonSerdeV2<>(PageView.class); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor isd = ksd.getInputDescriptor(inputTopic, serde); + final MessageStream broadcastPageViews = appDesc + .getInputStream(isd) + .map(message -> { + throw new RuntimeException("failed"); + }); + } + } +} From 3653abd786b0d280f61db5f9d965415c8d193a8c Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 10:27:03 -0700 Subject: [PATCH 2/7] Fixed additional tests and reverted unnecessary changes --- .../samza/processor/StreamProcessor.java | 4 ++-- .../apache/samza/util/StateTransitionUtil.java | 3 ++- .../samza/processor/TestStreamProcessor.java | 18 +++++++++++++----- .../TestZkLocalApplicationRunner.java | 7 +++---- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 61664a96ca..8c66a786f4 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -405,7 +405,7 @@ public void beforeStart() { @Override public void afterStart() { - LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", + LOGGER.info("Received container start notification for container: {} in stream processor: {}.", container, processorId); if (!processorOnStartCalled) { processorListener.afterStart(); @@ -423,7 +423,7 @@ public void afterStart() { public void afterStop() { containerShutdownLatch.countDown(); - if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED, START_REBALANCE), STOPPING)) { + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED, START_REBALANCE, IN_REBALANCE), STOPPING)) { LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); jobCoordinator.stop(); } else { diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java index c342ff4d32..f6310e39ca 100644 --- a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java @@ -83,12 +83,13 @@ public static boolean transitionWithBarrier(AtomicReference reference, T } try { - if (timeout == Duration.ZERO) { + if (timeout.isNegative()) { barrier.await(); } else { boolean completed = barrier.await(timeout.toMillis(), TimeUnit.MILLISECONDS); if (!completed) { LOG.error("Failed to transition from {} to {} due to barrier timeout.", currentState, desiredState); + return false; } } } catch (InterruptedException e) { diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 786aecfca7..4b8bc99c2e 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -45,11 +45,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -363,7 +367,10 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { * state and should shutdown JobCoordinator. */ Mockito.doNothing().when(mockJobCoordinator).start(); - Mockito.doNothing().when(mockJobCoordinator).stop(); + Mockito.doAnswer(ans -> { + streamProcessor.state.set(State.STOPPING); + return null; + }).when(mockJobCoordinator).stop(); Mockito.doNothing().when(mockSamzaContainer).shutdown(); Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); Mockito.when(mockSamzaContainer.getStatus()) @@ -393,15 +400,16 @@ public void testOnNewJobModelShouldResultInValidStateTransitions() { ProcessorLifecycleListener lifecycleListener = Mockito.mock(ProcessorLifecycleListener.class); SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class); MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0")); - StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator)); + StreamProcessor streamProcessor = new TestableStreamProcessor(config, new HashMap<>(), null, + lifecycleListener, mockJobCoordinator, mockSamzaContainer); - streamProcessor.container = mockSamzaContainer; streamProcessor.state.set(State.IN_REBALANCE); Mockito.doNothing().when(mockSamzaContainer).run(); streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); - Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run(); + Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any()); + Mockito.verify(mockSamzaContainer, Mockito.atLeast(1)).run(); } @Test diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index b249d4d287..9684105219 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -633,13 +633,12 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine /** * If the processing has started in the third stream processor, then other two stream processors should be stopped. */ - // TODO: This is a bug! Status should be unsuccessful finish. - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner1.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner2.status()); appRunner3.kill(); appRunner3.waitForFinish(); - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status()); + assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner3.status()); } } From d954b90eea8098bdaa86601b76b832ce27d39816 Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 15:07:39 -0700 Subject: [PATCH 3/7] Cleaned up log statements and minor edits --- .../samza/processor/StreamProcessor.java | 164 ++++++++++-------- .../samza/util/StateTransitionUtil.java | 91 ++++++---- .../test/framework/FaultInjectionTest.java | 31 ++-- .../TestZkLocalApplicationRunner.java | 5 +- 4 files changed, 162 insertions(+), 129 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 8c66a786f4..b9a2682cc1 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -70,27 +70,27 @@ * Describes the valid state transitions of the {@link StreamProcessor}. * * - * ──────────────────────────────── - * │ │ - * │ │ - * │ │ - * │ │ - * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ - * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING - * Creation │ │ by group leader │ and starts Container │ - * │ │ │ │ - * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() - * │ │ │ │ - * │ │ │ │ - * │ │ │ │ - * V V V V - * ───────────────────────────▶ STOPPING ────────────────────────────────────────────────────────── - * │ - * │ - * After JobCoordinator and SamzaContainer had shutdown. - * │ - * V - * STOPPED + * ─────────────────────────────── + * │ │ + * │ │ + * │ │ + * │ │ + * New StreamProcessor.start() Rebalance triggered V Receives JobModel │ + * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ─────────────────–––––––––––––––––––––─▶ START_REBALANCE ––––––––––––––––▶ IN_REBALANCE ─────────────────────▶ RUNNING + * Creation │ │ by group leader and starts Container │ │ + * │ │ │ │ + * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() + * │ │ │ │ + * │ │ │ │ + * │ │ │ │ + * V V V V + * ───────────–––––––––––––––––––––––––––––––––––––––––––––──────────────––––––––──▶ STOPPING ──────────────────────────────────────────────────────────––– + * │ + * │ + * After JobCoordinator and SamzaContainer had shutdown. + * │ + * V + * STOPPED * * */ @@ -110,13 +110,22 @@ public class StreamProcessor { private Throwable containerException = null; + @VisibleForTesting + AtomicReference state = new AtomicReference<>(NEW); + + @VisibleForTesting + SamzaContainer container = null; + + @VisibleForTesting + JobCoordinatorListener jobCoordinatorListener = null; + volatile CountDownLatch containerShutdownLatch = new CountDownLatch(0); /** * Indicates the current status of a {@link StreamProcessor}. */ public enum State { - STARTING("STARTING"), STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), + STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), START_REBALANCE("START_REBALANCE"), IN_REBALANCE("IN_REBALANCE"); private String strVal; @@ -138,15 +147,6 @@ public State getState() { return state.get(); } - @VisibleForTesting - AtomicReference state = new AtomicReference<>(NEW); - - @VisibleForTesting - SamzaContainer container = null; - - @VisibleForTesting - JobCoordinatorListener jobCoordinatorListener = null; - /** * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}. * @@ -218,16 +218,17 @@ public StreamProcessor(Config config, Map customMetrics *

*/ public void start() { - if (state.compareAndSet(NEW, STARTING)) { + if (state.get() == NEW) { processorListener.beforeStart(); - } else { - LOGGER.info("Stream processor has already been initialized and the current state {}", state.get()); - } - - if (state.compareAndSet(STARTING, STARTED)) { jobCoordinator.start(); + + if (!state.compareAndSet(NEW, STARTED)) { + LOGGER.info("Failed to transition to {} from {}Stream processor has already started and the current state {}", state.get()); + // todo: Should stop be invoked or do we throw exception? + // todo: We ideally want to call stop to make sure resources spun by jobcoordinator.start() are cleaned up + } } else { - LOGGER.info("Stream processor has already started and the current state {}", state.get()); + LOGGER.info("Stream processor has already been initialized and the current state {}", state.get()); } } @@ -258,14 +259,13 @@ public void start() { */ public void stop() { if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { - LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId); + LOGGER.info("Shutting down stream processor: {}", processorId); stopSamzaContainer(); - LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId); + LOGGER.info("Shutting down job coordinator for the stream processor: {}", processorId); jobCoordinator.stop(); } else if (state.get() == STOPPING || state.get() == STOPPED) { - // do we need to wait here or can we move on - LOGGER.info("Stream processor is shutting down with state {}", state.get()); + LOGGER.info("Shutdown in progress for stream processor: {} with state {}", processorId, state.get()); } } @@ -293,6 +293,7 @@ private JobCoordinator createJobCoordinator() { * Stops the {@link SamzaContainer}. */ private void stopSamzaContainer() { + LOGGER.info("Shutting down the container: {} for the stream processor: {}", container, processorId); if (container != null) { if (!container.hasStopped()) { try { @@ -301,7 +302,7 @@ private void stopSamzaContainer() { LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse); } } else { - LOGGER.info("Container is not instantiated for stream processor: {}.", processorId); + LOGGER.info("Container is not instantiated for the stream processor: {}", processorId); } } } @@ -312,20 +313,20 @@ private JobCoordinatorListener createJobCoordinatorListener() { @Override public void onJobModelExpired() { if (compareAndSet(state, ImmutableSet.of(STARTED, RUNNING), START_REBALANCE)) { - LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId); - - // attempt to stop the container + LOGGER.info("Job model expired for the stream processor: {}", processorId); stopSamzaContainer(); - // transition to IN_REBALANCE with container shutdown latch as the barrier + LOGGER.info("Entering re-balance phase with a barrier on container shutdown for the stream processor: {}", processorId); boolean inRebalance = transitionWithBarrier(state, START_REBALANCE , IN_REBALANCE, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); // failed to transition to IN_REBALANCE either container shutdown failed or barrier timed out if (!inRebalance) { - LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", - container, processorId); + LOGGER.warn("Failed to transition to re-balance phase." + + " Stopping the stream processor: {} due to unclean container shutdown.", processorId); jobCoordinator.stop(); + } else { + LOGGER.info("Successfully transitioned to re-balance phase for the stream processor: {}", processorId); } } else { LOGGER.info("Ignoring onJobModelExpired since the current state is {} and not in {}.", @@ -336,6 +337,7 @@ public void onJobModelExpired() { @Override public void onNewJobModel(String processorId, JobModel jobModel) { if (state.get() == IN_REBALANCE) { + LOGGER.info("New job model received for the stream processor: {}", processorId); containerShutdownLatch = new CountDownLatch(1); container = createSamzaContainer(processorId, jobModel); container.setContainerListener(new ContainerListener()); @@ -349,38 +351,55 @@ public void onNewJobModel(String processorId, JobModel jobModel) { @Override public void onCoordinatorStop() { + LOGGER.info("Received shutdown request from job coordinator. Shutting down the stream processor: {}", processorId); + + // Stop samza container only when stream processor state is not in STOPPING or STOPPED if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { - LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId); stopSamzaContainer(); } - boolean stopped = transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, - Duration.ofMillis(taskShutdownMs)); + if (state.get() == STOPPING) { + LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId); + boolean stopped = transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); + + if (containerException != null) { + processorListener.afterFailure(containerException); + } else if (stopped) { + processorListener.afterStop(); + } else { + executorService.shutdownNow(); + processorListener.afterFailure(new SamzaException("Samza container did not shutdown cleanly.")); + } - if (containerException != null) { - processorListener.afterFailure(containerException); - } else if (stopped) { - processorListener.afterStop(); + LOGGER.info("Shutdown for the stream processor: {} completed with status={}" + , processorId, containerException == null && stopped); } else { - executorService.shutdownNow(); - processorListener.afterFailure(new SamzaException("Samza container did not shutdown cleanly.")); + LOGGER.info("Ignoring shutdown request since the current state is {} and not {} for the stream processor: {}" + , new Object[] {state.get(), STOPPING, processorId}); } } @Override public void onCoordinatorFailure(Throwable throwable) { + LOGGER.info("Received shutdown request for the stream processor: {} from job coordinator due to {}" + , processorId, throwable); + if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { - LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s." - + " Original exception:", jobCoordinator, processorId), throwable); stopSamzaContainer(); + } - if (!transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch - , Duration.ofMillis(taskShutdownMs))) { + if (state.get() == STOPPING) { + LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId); + if (!transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs))) { executorService.shutdownNow(); - LOGGER.warn("Failed to transition from {} to {}.", STOPPING, STOPPED); } + // todo: Should we prioritize which exception to throw to the processor listener? processorListener.afterFailure(throwable); + LOGGER.info("Shutdown for the stream processor: {} completed with status=false"); + } else { + LOGGER.info("Ignoring shutdown request since the current state is {} and not {} for the stream processor: {}" + , new Object[] {state.get(), STOPPING, processorId}); } } }; @@ -405,14 +424,14 @@ public void beforeStart() { @Override public void afterStart() { - LOGGER.info("Received container start notification for container: {} in stream processor: {}.", + LOGGER.info("Received container start notification from container: {} for stream processor: {}", container, processorId); - if (!processorOnStartCalled) { - processorListener.afterStart(); - processorOnStartCalled = true; - } if (compareAndSet(state, ImmutableSet.of(STARTED, IN_REBALANCE), RUNNING)) { + if (!processorOnStartCalled) { + processorListener.afterStart(); + processorOnStartCalled = true; + } LOGGER.info("Stream processor started!"); } else { LOGGER.info("Invalid state transition from {} to {}", state.get(), RUNNING); @@ -421,27 +440,24 @@ public void afterStart() { @Override public void afterStop() { + LOGGER.info("Received stop notification from container: {} for stream processor: {}", container, processorId); containerShutdownLatch.countDown(); if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED, START_REBALANCE, IN_REBALANCE), STOPPING)) { LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId); jobCoordinator.stop(); - } else { - LOGGER.info("Container: {} stopped.", container); } } @Override public void afterFailure(Throwable t) { + LOGGER.info("Received failure notification from container: {} for stream processor: {}", container, processorId); containerException = t; containerShutdownLatch.countDown(); if (compareNotInAndSet(state, ImmutableSet.of(STOPPING, STOPPED), STOPPING)) { - LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s." - + " Original exception:", container, processorId), t); + LOGGER.error("Stopping the stream processor: {} due to container exception {}", processorId, t); jobCoordinator.stop(); - } else { - LOGGER.error("Container: %s failed with an exception {}.", container, t); } } } diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java index f6310e39ca..9a0510d746 100644 --- a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java @@ -27,58 +27,77 @@ import org.slf4j.LoggerFactory; +/** + * A utility class to perform complex state transitions. + */ public class StateTransitionUtil { private static final Logger LOG = LoggerFactory.getLogger(StateTransitionUtil.class); /** + * Atomically sets the value to the given updated value if current value is one of the expected values. + * + * @param reference the atomic reference + * @param expectedValues set of expected values + * @param update the new value + * @param type of the atomic reference * - * @param reference - * @param expectedValues - * @param value - * @param - * @return + * @return true if current state is one of the expected value and transition was successful; false otherwise */ - public static boolean compareAndSet(AtomicReference reference, Set expectedValues, T value) { - T currentValue = reference.get(); - if (expectedValues.contains(currentValue)) { - return reference.compareAndSet(currentValue, value); + public static boolean compareAndSet(AtomicReference reference, Set expectedValues, T update) { + while(true) { + T currentValue = reference.get(); + if (expectedValues.contains(currentValue)) { + if (reference.compareAndSet(currentValue, update)) { + return true; + } + } else { + return false; + } } - - return false; } /** + * Atomically sets the value to the given updated value if the black listed values does not contain current value. + * + * @param reference the atomic reference + * @param blacklistedValues set of blacklisted values + * @param update the new value + * @param type of the atomic reference * - * @param reference - * @param blacklistedValues - * @param value - * @param - * @return + * @return true if current state is not in the blacklisted values and transition was successful; false otherwise */ - public static boolean compareNotInAndSet(AtomicReference reference, Set blacklistedValues, T value) { - T currentValue = reference.get(); - if (blacklistedValues.contains(currentValue)) { - return false; - } + public static boolean compareNotInAndSet(AtomicReference reference, Set blacklistedValues, T update) { + while(true) { + T currentValue = reference.get(); + if (blacklistedValues.contains(currentValue)) { + return false; + } - return reference.compareAndSet(currentValue, value); + if(reference.compareAndSet(currentValue, update)) { + return true; + } + } } /** + * Atomically sets the value to the updated value if the current value == expected value and the barrier + * latch counts down to zero. If the barrier times out determined by the timeout parameter, the atomic reference + * is not updated. + * + * @param reference the atomic reference + * @param expect the expected value + * @param update the new value + * @param barrier the barrier latch + * @param timeout the timeout for barrier + * @param type of the atomic reference * - * @param reference - * @param currentState - * @param desiredState - * @param barrier - * @param timeout - * @param - * @return + * @return true if current state == expected value and the barrier completed and transition was successful; false otherwise */ - public static boolean transitionWithBarrier(AtomicReference reference, T currentState, T desiredState, + public static boolean transitionWithBarrier(AtomicReference reference, T expect, T update, CountDownLatch barrier, Duration timeout) { - if (reference.get() != currentState) { - LOG.error("Failed to transition from {} to {}", currentState, desiredState); - throw new IllegalStateException("Cannot transition to " + desiredState + " from state " + currentState + if (reference.get() != expect) { + LOG.error("Failed to transition from {} to {}", expect, update); + throw new IllegalStateException("Cannot transition to " + update + " from state " + expect + " since the current state is " + reference.get()); } @@ -88,15 +107,15 @@ public static boolean transitionWithBarrier(AtomicReference reference, T } else { boolean completed = barrier.await(timeout.toMillis(), TimeUnit.MILLISECONDS); if (!completed) { - LOG.error("Failed to transition from {} to {} due to barrier timeout.", currentState, desiredState); + LOG.error("Failed to transition from {} to {} due to barrier timeout.", expect, update); return false; } } } catch (InterruptedException e) { - LOG.error("Failed to transition from {} to {} due to {}", new Object[] {currentState, desiredState, e}); + LOG.error("Failed to transition from {} to {} due to {}", new Object[] {expect, update, e}); return false; } - return reference.compareAndSet(currentState, desiredState); + return reference.compareAndSet(expect, update); } } diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java index 4bc698e060..9210db3791 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/FaultInjectionTest.java @@ -25,32 +25,19 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.job.ApplicationStatus; import org.apache.samza.operators.MessageStream; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.system.kafka.KafkaInputDescriptor; import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -import org.junit.Before; import org.junit.Test; import static org.apache.samza.test.framework.TestTimerApp.*; +import static org.junit.Assert.*; public class FaultInjectionTest extends StreamApplicationIntegrationTestHarness { - - @Before - public void setup() { - // create topics - createTopic(PAGE_VIEWS, 2); - - // create events for the following user activity. - // userId: (viewId, pageId, (adIds)) - // u1: (v1, p1, (a1)), (v2, p2, (a3)) - // u2: (v3, p1, (a1)), (v4, p3, (a5)) - produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); - produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); - } - @Test public void testRaceCondition() throws InterruptedException { Map configs = new HashMap<>(); @@ -59,15 +46,23 @@ public void testRaceCondition() throws InterruptedException { configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory"); configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory"); configs.put(FaultInjectionStreamApp.INPUT_TOPIC_NAME_PROP, "page-views"); - configs.put("task.shutdown.ms", "10000"); + configs.put("task.shutdown.ms", "1000"); configs.put(JobConfig.PROCESSOR_ID(), "0"); + createTopic(PAGE_VIEWS, 2); + + // create events for the following user activity. + // userId: (viewId, pageId, (adIds)) + // u1: (v1, p1, (a1)), (v2, p2, (a3)) + // u2: (v3, p1, (a1)), (v4, p3, (a5)) + produceMessage(PAGE_VIEWS, 0, "p1", "{\"viewId\":\"v1\",\"pageId\":\"p1\",\"userId\":\"u1\"}"); + produceMessage(PAGE_VIEWS, 1, "p2", "{\"viewId\":\"v2\",\"pageId\":\"p2\",\"userId\":\"u1\"}"); + RunApplicationContext context = runApplication(new FaultInjectionStreamApp(), "fault-injection-app", configs); - Thread.sleep(1000); context.getRunner().kill(); context.getRunner().waitForFinish(); - System.out.println("Application status: " + context.getRunner().status()); + assertEquals(context.getRunner().status(), ApplicationStatus.UnsuccessfulFinish); } private static class FaultInjectionStreamApp implements StreamApplication { diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 9684105219..b84886370b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -617,6 +617,9 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine // Trigger re-balancing phase, by manually adding a new processor. configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + + // Reset the task shutdown ms for 3rd application to give it ample time to shutdown cleanly + configMap.put(TaskConfig.SHUTDOWN_MS(), TASK_SHUTDOWN_MS); Config applicationConfig3 = new MapConfig(configMap); CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); @@ -638,7 +641,7 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine appRunner3.kill(); appRunner3.waitForFinish(); - assertEquals(ApplicationStatus.UnsuccessfulFinish, appRunner3.status()); + assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status()); } } From 638d22208c6068a6a056f73dec20eb6b6df1025e Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 15:27:56 -0700 Subject: [PATCH 4/7] Fix checkstyle errors --- .../org/apache/samza/util/StateTransitionUtil.java | 6 +++--- .../apache/samza/processor/TestStreamProcessor.java | 10 +++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java index 9a0510d746..6fa9439652 100644 --- a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java @@ -44,7 +44,7 @@ public class StateTransitionUtil { * @return true if current state is one of the expected value and transition was successful; false otherwise */ public static boolean compareAndSet(AtomicReference reference, Set expectedValues, T update) { - while(true) { + while (true) { T currentValue = reference.get(); if (expectedValues.contains(currentValue)) { if (reference.compareAndSet(currentValue, update)) { @@ -67,13 +67,13 @@ public static boolean compareAndSet(AtomicReference reference, Set exp * @return true if current state is not in the blacklisted values and transition was successful; false otherwise */ public static boolean compareNotInAndSet(AtomicReference reference, Set blacklistedValues, T update) { - while(true) { + while (true) { T currentValue = reference.get(); if (blacklistedValues.contains(currentValue)) { return false; } - if(reference.compareAndSet(currentValue, update)) { + if (reference.compareAndSet(currentValue, update)) { return true; } } diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 4b8bc99c2e..5282b78927 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -45,12 +45,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest; -import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.*; @@ -368,9 +364,9 @@ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() { */ Mockito.doNothing().when(mockJobCoordinator).start(); Mockito.doAnswer(ans -> { - streamProcessor.state.set(State.STOPPING); - return null; - }).when(mockJobCoordinator).stop(); + streamProcessor.state.set(State.STOPPING); + return null; + }).when(mockJobCoordinator).stop(); Mockito.doNothing().when(mockSamzaContainer).shutdown(); Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false); Mockito.when(mockSamzaContainer.getStatus()) From ac33c6f88424e1a3a4f295a2980e48f709527e6c Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 15:39:43 -0700 Subject: [PATCH 5/7] Fix stream processor tests --- .../java/org/apache/samza/processor/TestStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java index 5282b78927..8846c68fcb 100644 --- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java +++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java @@ -405,7 +405,7 @@ public void testOnNewJobModelShouldResultInValidStateTransitions() { streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>())); Mockito.verify(mockSamzaContainer, Mockito.times(1)).setContainerListener(any()); - Mockito.verify(mockSamzaContainer, Mockito.atLeast(1)).run(); + Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run(); } @Test From 0b5fc237ff46d1ad67a71342c3ec86e8d48935d5 Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 18:44:13 -0700 Subject: [PATCH 6/7] Add unit tests for StateTransitionUtil --- .../samza/processor/StreamProcessor.java | 6 +- .../samza/util/StateTransitionUtil.java | 8 +- .../samza/util/TestStateTransitionUtil.java | 163 ++++++++++++++++++ 3 files changed, 170 insertions(+), 7 deletions(-) create mode 100644 samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index b9a2682cc1..9f4c0c0598 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -317,7 +317,7 @@ public void onJobModelExpired() { stopSamzaContainer(); LOGGER.info("Entering re-balance phase with a barrier on container shutdown for the stream processor: {}", processorId); - boolean inRebalance = transitionWithBarrier(state, START_REBALANCE + boolean inRebalance = compareAndSetWithBarrier(state, START_REBALANCE , IN_REBALANCE, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); // failed to transition to IN_REBALANCE either container shutdown failed or barrier timed out @@ -360,7 +360,7 @@ public void onCoordinatorStop() { if (state.get() == STOPPING) { LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId); - boolean stopped = transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); + boolean stopped = compareAndSetWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs)); if (containerException != null) { processorListener.afterFailure(containerException); @@ -390,7 +390,7 @@ public void onCoordinatorFailure(Throwable throwable) { if (state.get() == STOPPING) { LOGGER.info("Attempting to shutdown stream processor: {} with a barrier on container shutdown.", processorId); - if (!transitionWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs))) { + if (!compareAndSetWithBarrier(state, STOPPING, STOPPED, containerShutdownLatch, Duration.ofMillis(taskShutdownMs))) { executorService.shutdownNow(); } diff --git a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java index 6fa9439652..f90e06e321 100644 --- a/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/StateTransitionUtil.java @@ -19,7 +19,7 @@ package org.apache.samza.util; import java.time.Duration; -import java.util.Set; +import java.util.Collection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +43,7 @@ public class StateTransitionUtil { * * @return true if current state is one of the expected value and transition was successful; false otherwise */ - public static boolean compareAndSet(AtomicReference reference, Set expectedValues, T update) { + public static boolean compareAndSet(AtomicReference reference, Collection expectedValues, T update) { while (true) { T currentValue = reference.get(); if (expectedValues.contains(currentValue)) { @@ -66,7 +66,7 @@ public static boolean compareAndSet(AtomicReference reference, Set exp * * @return true if current state is not in the blacklisted values and transition was successful; false otherwise */ - public static boolean compareNotInAndSet(AtomicReference reference, Set blacklistedValues, T update) { + public static boolean compareNotInAndSet(AtomicReference reference, Collection blacklistedValues, T update) { while (true) { T currentValue = reference.get(); if (blacklistedValues.contains(currentValue)) { @@ -93,7 +93,7 @@ public static boolean compareNotInAndSet(AtomicReference reference, Set boolean transitionWithBarrier(AtomicReference reference, T expect, T update, + public static boolean compareAndSetWithBarrier(AtomicReference reference, T expect, T update, CountDownLatch barrier, Duration timeout) { if (reference.get() != expect) { LOG.error("Failed to transition from {} to {}", expect, update); diff --git a/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java new file mode 100644 index 0000000000..263d99302c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestStateTransitionUtil.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Test class for {@link StateTransitionUtil} + */ +public class TestStateTransitionUtil { + private static final List STATES = ImmutableList.of("A", "B", "C", "D", "E"); + private static final Map> VALID_TRANSITIONS = new ImmutableMap.Builder>() + .put("A", ImmutableList.of("C", "D")) + .put("B", ImmutableList.of("D", "E")) + .put("C", ImmutableList.of("D", "A", "E")) + .put("D", ImmutableList.of("A", "B")) + .put("E", ImmutableList.of("B", "C")) + .build(); + + private static final Map> INVALID_TRANSITIONS = new ImmutableMap.Builder>() + .put("A", ImmutableList.of("A", "B", "E")) + .put("B", ImmutableList.of("A", "B", "C")) + .put("C", ImmutableList.of("B", "C")) + .put("D", ImmutableList.of("C", "D", "E")) + .put("E", ImmutableList.of("A", "D", "E")) + .build(); + + private static final Random RANDOM = new Random(); + + @Test + public void testCompareAndSet() { + AtomicReference reference = new AtomicReference<>(); + + for (String desiredState : STATES) { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + reference.set(currentState); + + boolean actual = StateTransitionUtil.compareAndSet(reference, VALID_TRANSITIONS.get(desiredState), desiredState); + boolean expected = VALID_TRANSITIONS.get(desiredState).contains(currentState); + assertEquals(expected, actual); + } + } + + @Test + public void testCompareNotInAndSet() { + AtomicReference reference = new AtomicReference<>(); + + for (String desiredState : STATES) { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + reference.set(currentState); + + boolean actual = StateTransitionUtil.compareNotInAndSet(reference, INVALID_TRANSITIONS.get(desiredState), desiredState); + boolean expected = !INVALID_TRANSITIONS.get(desiredState).contains(currentState); + assertEquals(expected, actual); + } + } + + @Test + public void testCompareAndSetWithBarrierTimeout() { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + String desiredState = STATES.get(RANDOM.nextInt(STATES.size())); + AtomicReference reference = new AtomicReference<>(currentState); + + final CountDownLatch barrier = new CountDownLatch(1); + Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000)); + boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, timeout); + + assertFalse(transitionResult); + } + + @Test + public void testCompareAndSetWithBarrierAlreadyCompleted() { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + String desiredState = STATES.get(RANDOM.nextInt(STATES.size())); + AtomicReference reference = new AtomicReference<>(currentState); + + final CountDownLatch barrier = new CountDownLatch(0); + Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000)); + boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, timeout); + + assertTrue(transitionResult); + } + + @Test + public void testCompareAndSetWithBarrierCompletionWithStateChange() { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + String desiredState = STATES.get(RANDOM.nextInt(STATES.size())); + AtomicReference reference = new AtomicReference<>(currentState); + + final CountDownLatch barrier = new CountDownLatch(1); + Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000)); + + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(timeout.toMillis()); + } catch (InterruptedException e) { + + } + + for (String state : STATES) { + if (!currentState.equals(state)) { + reference.set(state); + break; + } + } + barrier.countDown(); + }); + + boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, Duration.ofMillis(-1)); + assertFalse(transitionResult); + } + + @Test + public void testCompareAndSetWithBarrierCompletion() { + String currentState = STATES.get(RANDOM.nextInt(STATES.size())); + String desiredState = STATES.get(RANDOM.nextInt(STATES.size())); + AtomicReference reference = new AtomicReference<>(currentState); + + final CountDownLatch barrier = new CountDownLatch(1); + Duration timeout = Duration.ofMillis(RANDOM.nextInt(1000)); + + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(timeout.toMillis()); + } catch (InterruptedException e) { + + } + + barrier.countDown(); + }); + + boolean transitionResult = StateTransitionUtil.compareAndSetWithBarrier(reference, currentState, desiredState, barrier, Duration.ofMillis(-1)); + assertTrue(transitionResult); + } +} From ef56da086bbc28f44faa290044ba76a29e3e1282 Mon Sep 17 00:00:00 2001 From: bharathkk Date: Tue, 25 Sep 2018 18:50:39 -0700 Subject: [PATCH 7/7] Minor edits --- .../main/java/org/apache/samza/processor/StreamProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 9f4c0c0598..b1dcf62937 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -223,7 +223,7 @@ public void start() { jobCoordinator.start(); if (!state.compareAndSet(NEW, STARTED)) { - LOGGER.info("Failed to transition to {} from {}Stream processor has already started and the current state {}", state.get()); + LOGGER.info("Failed to transition to STARTED since the current state is {} and not {}", state.get(), NEW); // todo: Should stop be invoked or do we throw exception? // todo: We ideally want to call stop to make sure resources spun by jobcoordinator.start() are cleaned up } @@ -345,7 +345,7 @@ public void onNewJobModel(String processorId, JobModel jobModel) { executorService.submit(container); } else { LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", - state.get(), State.IN_REBALANCE); + state.get(), IN_REBALANCE); } }