diff --git a/README.md b/README.md
index a431281d3a..1a7fdd9f90 100644
--- a/README.md
+++ b/README.md
@@ -46,9 +46,13 @@ To run key-value performance tests:
./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties
-To run all integration tests:
+To run yarn integration tests:
- ./bin/integration-tests.sh
+ ./bin/integration-tests.sh yarn-integration-tests
+
+To run standalone integration tests:
+
+ ./bin/integration-tests.sh standalone-integration-tests
### Running checkstyle on the java code ###
diff --git a/bin/integration-tests.sh b/bin/integration-tests.sh
index 14fcd1c030..248236fd34 100755
--- a/bin/integration-tests.sh
+++ b/bin/integration-tests.sh
@@ -19,6 +19,7 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=$DIR/..
TEST_DIR=$1
+FAILURE_TEST_TYPE=$2
if test -z "$TEST_DIR"; then
echo
@@ -70,17 +71,25 @@ source $SAMZA_INTEGRATION_TESTS_DIR/bin/activate
# install zopkio and requests
pip install -r $SCRIPTS_DIR/requirements.txt
-# treat all trailing parameters (after dirname) as zopkio switches
+# treat all trailing parameters (after dirname, test_type) as zopkio switches
shift
-SWITCHES="$*"
+SWITCHES="${*:3}"
# default to info-level debugging if not specified
if [[ $SWITCHES != *"console-log-level"* ]]; then
SWITCHES="$SWITCHES --console-log-level INFO"
fi
-# run the tests
-zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
+if [[ ${FAILURE_TEST_TYPE} == "yarn-integration-tests" ]]; then
+ echo "Running yarn integration tests."
+ zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
+elif [[ ${FAILURE_TEST_TYPE} == "standalone-integration-tests" ]]; then
+ echo "Running standalone integration tests."
+ zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/standalone_integration_tests.py
+else
+ echo "Invalid failure test type: $FAILURE_TEST_TYPE"
+ exit -1
+fi
# go back to execution directory
deactivate
diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties
new file mode 100644
index 0000000000..d855d5f020
--- /dev/null
+++ b/samza-test/src/main/config/standalone.failure.test.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
+
+app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication
+
+app.name=test-app-name
+app.id=test-app-id
+job.name=test-app-name
+job.id=test-app-id
+
+## Kafka I/O system properties.
+task.inputs=standalone_integration_test_kafka_input_topic
+input.stream.name=standalone_integration_test_kafka_input_topic
+job.default.system=testSystemName
+systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.testSystemName.producer.bootstrap.servers=localhost:9092
+systems.testSystemName.consumer.zookeeper.connect=localhost:2181
+
+## Zookeeper coordination properties
+job.coordinator.zk.connect=localhost:2181
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+task.shutdown.ms=4000
+job.debounce.time.ms=4000
+job.coordinator.zk.consensus.timeout.ms=4000
+job.coordinator.zk.session.timeout.ms=4000
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
\ No newline at end of file
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
new file mode 100644
index 0000000000..e8be592e03
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.integration;
+
+import joptsimple.OptionSet;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunnerMain;
+import org.apache.samza.runtime.ApplicationRunnerOperation;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG;
+
+/**
+ * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn
+ * and doesn't work for in standalone.
+ *
+ * This runner class is built for standalone failure tests and not recommended for general use.
+ */
+public class LocalApplicationRunnerMain {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalApplicationRunnerMain.class);
+
+ public static void main(String[] args) throws Exception {
+ ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
+ OptionSet options = cmdLine.parser().parse(args);
+ Config orgConfig = cmdLine.loadConfig(options);
+ Config config = Util.rewriteConfig(orgConfig);
+
+ ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+ StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
+
+ ApplicationRunnerOperation op = cmdLine.getOperation(options);
+
+ try {
+ LOGGER.info("Launching stream application: {} to run.", app);
+ runner.run(app);
+ runner.waitForFinish();
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e);
+ }
+ }
+}
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
new file mode 100644
index 0000000000..f6e3d5fe6e
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.integration;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Acts as a pass through filter for all the events from a input stream.
+ */
+public class TestStandaloneIntegrationApplication implements StreamApplication {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ String inputStream = config.get("input.stream.name");
+ String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
+ LOGGER.info("Publishing message to: {}.", outputStreamName);
+ graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName));
+ }
+}
diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json
index 91fe23eaad..14b21372c4 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,7 +3,7 @@
"kafka_instance_0": "localhost"
},
"kafka_port": 9092,
- "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties",
+ "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties --override delete.topic.enable=true",
"kafka_stop_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-stop.sh",
"kafka_install_path": "deploy/kafka",
"kafka_executable": "kafka_2.10-0.10.1.1.tgz",
diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py
index 89ba72891d..7cd3cacb4c 100644
--- a/samza-test/src/main/python/deployment.py
+++ b/samza-test/src/main/python/deployment.py
@@ -72,9 +72,7 @@ def setup_suite():
runtime.set_deployer(name, deployer)
for instance, host in c(name + '_hosts').iteritems():
logger.info('Deploying {0} on host: {1}'.format(instance, host))
- deployer.deploy(instance, {
- 'hostname': host
- })
+ deployer.start(instance, {'hostname': host})
# Setup Samza job deployer.
samza_job_deployer = SamzaJobYarnDeployer({
diff --git a/samza-test/src/main/python/requirements.txt b/samza-test/src/main/python/requirements.txt
index cf43a23620..e39538e445 100644
--- a/samza-test/src/main/python/requirements.txt
+++ b/samza-test/src/main/python/requirements.txt
@@ -19,3 +19,4 @@ zopkio==0.2.5
requests
kafka-python==1.3.3
Jinja2
+kazoo==2.5
\ No newline at end of file
diff --git a/samza-test/src/main/python/standalone_deployment.py b/samza-test/src/main/python/standalone_deployment.py
new file mode 100644
index 0000000000..cd038efb1c
--- /dev/null
+++ b/samza-test/src/main/python/standalone_deployment.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import zopkio.adhoc_deployer as adhoc_deployer
+from zopkio.runtime import get_active_config as c
+from subprocess import PIPE, Popen
+import logging
+import time
+import urllib
+import os
+
+TEST_INPUT_TOPIC = 'standalone_integration_test_kafka_input_topic'
+TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'
+
+logger = logging.getLogger(__name__)
+deployers = {}
+
+def setup_suite():
+ """
+ Setup method that will be run once by zopkio test_runner before all the integration tests.
+ """
+ ## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json.
+ _download_components(['zookeeper', 'kafka'])
+
+ _deploy_components(['zookeeper', 'kafka'])
+
+ ## Create input and output topics.
+ for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]:
+ logger.info("Deleting topic: {0}.".format(topic))
+ _delete_kafka_topic('localhost:2181', topic)
+ logger.info("Creating topic: {0}.".format(topic))
+ _create_kafka_topic('localhost:2181', topic, 3, 1)
+
+def _download_components(components):
+ """
+ Download the :param components if unavailable in deployment directory using url defined in config.
+ """
+
+ for component in components:
+ url_key = 'url_{0}'.format(component)
+ url = c(url_key)
+ filename = os.path.basename(url)
+ if os.path.exists(filename):
+ logger.debug('Using cached file: {0}.'.format(filename))
+ else:
+ logger.info('Downloading {0} from {1}.'.format(component, url))
+ urllib.urlretrieve(url, filename)
+
+def _deploy_components(components):
+ """
+ Install and start all the :param components through binaries in deployment directory.
+ """
+
+ global deployers
+
+ for component in components:
+ config = {
+ 'install_path': os.path.join(c('remote_install_path'), c(component + '_install_path')),
+ 'executable': c(component + '_executable'),
+ 'post_install_cmds': c(component + '_post_install_cmds', []),
+ 'start_command': c(component + '_start_cmd'),
+ 'stop_command': c(component + '_stop_cmd'),
+ 'extract': True,
+ 'sync': True,
+ }
+ deployer = adhoc_deployer.SSHDeployer(component, config)
+ deployers[component] = deployer
+ for instance, host in c(component + '_hosts').iteritems():
+ logger.info('Deploying {0} on host: {1}'.format(instance, host))
+ deployer.start(instance, {'hostname': host})
+ time.sleep(5)
+
+def _create_kafka_topic(zookeeper_servers, topic_name, partition_count, replication_factor):
+ """
+ :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
+ :param topic_name: name of kafka topic to create.
+ :param partition_count: Number of partitions of the kafka topic.
+ :param replication_factor: Replication factor of the kafka topic.
+ """
+
+ ### Using command line utility to create kafka topic since kafka python API doesn't support configuring partitionCount during topic creation.
+ base_dir = os.getcwd()
+ create_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --create --zookeeper {1} --replication-factor {2} --partitions {3} --topic {4}'.format(base_dir, zookeeper_servers, replication_factor, partition_count, topic_name)
+ p = Popen(create_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ output, err = p.communicate()
+ logger.info("Output from create kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))
+
+def _delete_kafka_topic(zookeeper_servers, topic_name):
+ """
+ Delete kafka topic defined by the method parameters.
+
+ :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
+ :param topic_name: name of kafka topic to delete.
+ """
+
+ base_dir = os.getcwd()
+ delete_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --delete --zookeeper {1} --topic {2}'.format(base_dir, zookeeper_servers, topic_name)
+ logger.info("Deleting topic: {0}.".format(topic_name))
+ p = Popen(delete_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ output, err = p.communicate()
+ logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))
+
+def teardown_suite():
+ """
+ Teardown method that will be run once by zopkio test_runner after all the integration tests.
+ """
+ for component in ['kafka', 'zookeeper']:
+ deployer = deployers[component]
+ for instance, host in c(component + '_hosts').iteritems():
+ deployer.undeploy(instance)
diff --git a/samza-test/src/main/python/standalone_integration_tests.py b/samza-test/src/main/python/standalone_integration_tests.py
new file mode 100644
index 0000000000..8405413613
--- /dev/null
+++ b/samza-test/src/main/python/standalone_integration_tests.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# 'License'); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+dir = os.path.dirname(os.path.abspath(__file__))
+
+test = {
+ 'deployment_code': os.path.join(dir, 'standalone_deployment.py'),
+ 'perf_code': os.path.join(dir, 'perf.py'),
+ 'configs_directory': os.path.join(dir, 'configs'),
+ 'test_code': [
+ os.path.join(dir, 'tests', 'standalone_failure_tests.py'),
+ ],
+}
diff --git a/samza-test/src/main/python/stream_processor.py b/samza-test/src/main/python/stream_processor.py
new file mode 100644
index 0000000000..a7a2bc8768
--- /dev/null
+++ b/samza-test/src/main/python/stream_processor.py
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import time
+import zopkio.adhoc_deployer as adhoc_deployer
+from zopkio.remote_host_helper import get_ssh_client, exec_with_env
+import zopkio.runtime as runtime
+
+logger = logging.getLogger(__name__)
+
+class StreamProcessor:
+ """
+ Represents a standalone StreamProcessor that uses zookeeper for coordination. Used in standalone failure tests to
+ to manage the lifecycle of linux process(start, kill, pause) associated with the StreamProcessor.
+ """
+
+ def __init__(self, host_name, processor_id):
+ """
+ :param host_name: Represents the host name in which this StreamProcessor will run.
+ :param processor_id: Represents the processor_id of StreamProcessor.
+ """
+ start_cmd = 'export SAMZA_LOG_DIR=\"deploy/{0}\"; export JAVA_OPTS=\"$JAVA_OPTS -Xmx2G\"; ./bin/run-class.sh org.apache.samza.test.integration.LocalApplicationRunnerMain --config-path ./config/standalone.failure.test.properties --operation run --config processor.id={0} >> /tmp/{0}.log &'
+ self.username = runtime.get_username()
+ self.password = runtime.get_password()
+ self.processor_id = processor_id
+ self.host_name = host_name
+ self.processor_start_command = start_cmd.format(self.processor_id)
+ logger.info('Running processor start command: {0}'.format(self.processor_start_command))
+ self.deployment_config = {
+ 'install_path': os.path.join(runtime.get_active_config('remote_install_path'), 'deploy/{0}'.format(self.processor_id)),
+ 'executable': 'samza-test_2.11-0.15.0-SNAPSHOT.tgz',
+ 'post_install_cmds': [],
+ 'start_command': self.processor_start_command,
+ 'stop_command': '',
+ 'extract': True,
+ 'sync': True,
+ }
+ self.deployer = adhoc_deployer.SSHDeployer(self.processor_id, self.deployment_config)
+
+ def start(self):
+ """
+ Submits the StreamProcessor for execution on a host: host_name.
+ """
+ logger.info("Starting processor with id: {0}.".format(self.processor_id))
+ self.deployer.start(self.processor_id, {'hostname': self.host_name})
+
+ def get_processor_id(self):
+ """
+ Returns the processorId of the StreamProcessor.
+ """
+ return self.processor_id
+
+ def kill(self):
+ """
+ Kills the StreamProcessor process through SIGKILL signal.
+ """
+ self.__send_signal_to_processor("SIGKILL")
+
+ def pause(self):
+ """
+ Pauses the StreamProcessor process through SIGSTOP signal.
+ """
+ self.__send_signal_to_processor("SIGSTOP")
+
+ def resume(self):
+ """
+ Resumes the stream processor process through SIGCONT signal.
+ """
+ self.__send_signal_to_processor("SIGCONT")
+
+ def __send_signal_to_processor(self, signal):
+ """
+ Sends a signal(:param signal) to the linux process of the StreamProcessor.
+ """
+ linux_process_pids = self.__get_pid()
+ for linux_process_pid in linux_process_pids:
+ command = "kill -{0} {1}".format(signal, linux_process_pid)
+ result = self.__execute_command(command)
+ logger.info("Result of {0} is: {1}.".format(command, result))
+
+ def __get_pid(self):
+ """
+ Determines the linux process id associated with this StreamProcessor.
+ """
+ ps_command = "ps aux | grep '{0}' | grep -v grep | tr -s ' ' | cut -d ' ' -f 2 | grep -Eo '[0-9]+'".format(self.processor_id)
+ non_failing_command = "{0}; if [ $? -le 1 ]; then true; else false; fi;".format(ps_command)
+ logger.info("Executing command: {0}.".format(non_failing_command))
+ full_output = self.__execute_command(non_failing_command)
+ pids = []
+ if len(full_output) > 0:
+ pids = [int(pid_str) for pid_str in full_output.split('\n') if pid_str.isdigit()]
+ return pids
+
+ def __execute_command(self, command):
+ """
+ Executes the :param command on host: self.host_name.
+ """
+ with get_ssh_client(self.host_name, username=self.username, password=self.password) as ssh:
+ chan = exec_with_env(ssh, command, msg="Failed to get PID", env={})
+ execution_result = ''
+ while True:
+ result_buffer = chan.recv(16)
+ if len(result_buffer) == 0:
+ break
+ execution_result += result_buffer
+ return execution_result
diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py
new file mode 100644
index 0000000000..0fab7421bf
--- /dev/null
+++ b/samza-test/src/main/python/tests/standalone_failure_tests.py
@@ -0,0 +1,311 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import util
+import sys
+import logging
+from kafka import SimpleProducer, SimpleConsumer
+import time
+import traceback
+from stream_processor import StreamProcessor
+from zk_client import ZkClient
+import threading
+
+logger = logging.getLogger(__name__)
+NUM_MESSAGES = 50
+GROUP_COORDINATION_TIMEOUT = 14
+TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'
+zk_client = None
+
+### TODO: In each test add barrier state and processorId validations after fixing data serialization format in zookeeper(SAMZA-1749).
+def __purge_zk_data():
+ """
+ Recursively deletes all data nodes created in zookeeper in a test-run.
+ """
+ zk_client.purge_all_nodes()
+
+def __pump_messages_into_input_topic():
+ """
+ Produce 50 messages into input topic: standalone_integration_test_kafka_input_topic.
+ """
+ kafka_client = None
+ input_topic = 'standalone_integration_test_kafka_input_topic'
+ try:
+ kafka_client = util.get_kafka_client()
+ kafka_client.ensure_topic_exists(input_topic)
+ producer = SimpleProducer(kafka_client, async=False, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, ack_timeout=30000)
+ logger.info('Producing {0} messages to topic: {1}'.format(NUM_MESSAGES, input_topic))
+ for message_index in range(1, NUM_MESSAGES + 1):
+ producer.send_messages(input_topic, str(message_index))
+ except:
+ logger.error(traceback.format_exc(sys.exc_info()))
+ finally:
+ if kafka_client is not None:
+ kafka_client.close()
+
+def __setup_processors():
+ """
+ Instantiates and schedules three stream processors for execution in localhost.
+ :return the instantiated stream processors.
+ """
+ processors = {}
+ for processor_id in ['standalone-processor-1', 'standalone-processor-2', 'standalone-processor-3']:
+ processors[processor_id] = StreamProcessor(host_name='localhost', processor_id=processor_id)
+ processors[processor_id].start()
+ return processors
+
+def __tear_down_processors(processors):
+ """
+ Kills all the stream processor passed in :param processors.
+ """
+ for processor_id, processor in processors.iteritems():
+ logger.info("Killing processor: {0}.".format(processor_id))
+ processor.kill()
+
+def __setup_zk_client():
+ """
+ Instantiate a ZkClient to connect to a zookeeper server in localhost.
+ """
+ global zk_client
+ zk_client = ZkClient(zookeeper_host='127.0.0.1', zookeeper_port='2181', app_name='test-app-name', app_id='test-app-id')
+ zk_client.start()
+
+def __teardown_zk_client():
+ """
+ Stops the ZkClient.
+ """
+ global zk_client
+ zk_client.stop()
+
+def job_model_watcher(event, expected_processors):
+ start_time_seconds = time.time()
+ elapsed_time_seconds = (int)(time.time() - start_time_seconds)
+ while elapsed_time_seconds <= 30:
+ recent_job_model = zk_client.get_latest_job_model()
+ if set(recent_job_model['containers'].keys()) == set(expected_processors):
+ event.set()
+ return
+ else:
+ time.sleep(2)
+ elapsed_time_seconds = (int)(time.time() - start_time_seconds)
+
+def __validate_job_model(job_model, killed_processors=[]):
+ ## Validate the TaskModel. Check if all the partitions are assigned to the containers.
+ expected_ssps = [{u'partition': 0, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
+ {u'partition': 1, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
+ {u'partition': 2, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}]
+ actual_ssps = []
+ for container_id, tasks in job_model['containers'].iteritems():
+ for partition, ssps in tasks['tasks'].iteritems():
+ actual_ssps.append(ssps['system-stream-partitions'][0])
+ actual_ssps.sort()
+ assert expected_ssps == actual_ssps, 'Expected ssp: {0}, Actual ssp: {1}.'.format(expected_ssps, actual_ssps)
+
+ ## Validate the ContainerModel. Live processors should be present in the JobModel and killed processors should not be in JobModel.
+ active_processors = zk_client.get_active_processors()
+ assert set(active_processors) == set(job_model['containers'].keys()), 'ProcessorIds: {0} does not exist in JobModel: {1}.'.format(active_processors, job_model['containers'].keys())
+ for processor_id in killed_processors:
+ assert processor_id not in job_model['containers'], 'Processor: {0} exists in JobModel: {1}.'.format(processor_id, job_model)
+
+def __get_job_model(expected_processors):
+ event = threading.Event()
+ zk_client.watch_job_model(job_model_watcher(event=event, expected_processors=expected_processors))
+ event.wait(2 * GROUP_COORDINATION_TIMEOUT)
+ return zk_client.get_latest_job_model()
+
+def test_kill_leader():
+ """
+ Launches three stream processors. Kills the leader processor. Waits till the group coordination timeout
+ and verifies that the final JobModel contains both the followers.
+ """
+ processors = {}
+ try:
+ __setup_zk_client()
+ __pump_messages_into_input_topic()
+ processors = __setup_processors()
+
+ ## Validations before killing the leader.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model, [])
+
+ leader_processor_id = zk_client.get_leader_processor_id()
+ processors.pop(leader_processor_id).kill()
+
+ ## Validations after killing the leader.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ assert leader_processor_id != zk_client.get_leader_processor_id(), '{0} is still the leader'.format(leader_processor_id)
+ __validate_job_model(job_model, [leader_processor_id])
+ except:
+ ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+ logger.error(traceback.format_exc(sys.exc_info()))
+ raise
+ finally:
+ __tear_down_processors(processors)
+ __purge_zk_data()
+ __teardown_zk_client()
+
+def test_kill_one_follower():
+ """
+ Launches three stream processors. Kills one follower processor. Waits till the group coordination timeout and
+ verifies that the final JobModel contains the leader processor and un-killed follower processor.
+ """
+ processors = {}
+ try:
+ __setup_zk_client()
+ __pump_messages_into_input_topic()
+ processors = __setup_processors()
+
+ leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
+
+ ## Validations before killing the follower.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model)
+
+ for processor_id, deployer in processors.iteritems():
+ if processor_id != leader_processor_id:
+ follower = processors.pop(processor_id)
+ follower.kill()
+ killed_processors.append(follower)
+ break
+
+ ## Validations after killing the follower.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ assert leader_processor_id == zk_client.get_leader_processor_id(), '{0} is not the leader'.format(leader_processor_id)
+ __validate_job_model(job_model, killed_processors)
+ except:
+ ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+ logger.error(traceback.format_exc(sys.exc_info()))
+ raise
+ finally:
+ __tear_down_processors(processors)
+ __purge_zk_data()
+ __teardown_zk_client()
+
+def test_kill_multiple_followers():
+ """
+ Launches three stream processors. Kills both the follower processors. Waits for group coordination timeout
+ and verifies that the final JobModel contains only the leader processor.
+ """
+ processors = {}
+ try:
+ __setup_zk_client()
+ __pump_messages_into_input_topic()
+ processors = __setup_processors()
+
+ ## Validations before killing the followers.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model)
+
+ leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
+
+ for processor_id in processors.keys():
+ if processor_id != leader_processor_id:
+ follower = processors.pop(processor_id)
+ killed_processors.append(follower)
+ follower.kill()
+
+ ## Validations after killing the followers.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model, killed_processors)
+ except:
+ ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+ logger.error(traceback.format_exc(sys.exc_info()))
+ raise
+ finally:
+ __tear_down_processors(processors)
+ __purge_zk_data()
+ __teardown_zk_client()
+
+def test_kill_leader_and_a_follower():
+ """
+ Launches three stream processors. Kills both a leader and a follower processors.
+ Waits till the group coordination timeout and verifies that the final JobModel contains only one processor.
+ """
+ processors = {}
+ try:
+ __setup_zk_client()
+ __pump_messages_into_input_topic()
+ processors = __setup_processors()
+
+ leader_processor_id = zk_client.get_leader_processor_id()
+
+ ## Validations before killing the leader and follower.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model)
+
+ killed_processors = [leader_processor_id]
+ processors.pop(leader_processor_id).kill()
+
+ for processor_id in processors.keys():
+ follower = processors.pop(processor_id)
+ killed_processors.append(processor_id)
+ follower.kill()
+ break
+
+ ## Validations after killing the leader and follower.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model, killed_processors)
+ except:
+ ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+ logger.error(traceback.format_exc(sys.exc_info()))
+ raise
+ finally:
+ __tear_down_processors(processors)
+ __purge_zk_data()
+ __teardown_zk_client()
+
+def test_pause_resume_leader():
+ """
+ Launches three processors. Pauses the leader processor. Wait till group coordination timeout and verifies that the
+ JobModel doesn't contain leader processor. Resumes the leader processor and waits till group coordination timeout,
+ verifies that new JobModel contains the previously paused leader processor.
+ """
+ processors = {}
+ try:
+ __setup_zk_client()
+ __pump_messages_into_input_topic()
+ processors = __setup_processors()
+
+ ## Validations before pausing the leader.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model)
+
+ leader_processor_id = zk_client.get_leader_processor_id()
+ leader = processors.pop(leader_processor_id)
+
+ logger.info("Pausing the leader processor: {0}.".format(leader_processor_id))
+ leader.pause()
+
+ ## Validations after pausing the leader.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model, [leader_processor_id])
+
+ logger.info("Resuming the leader processor: {0}.".format(leader_processor_id))
+ leader.resume()
+
+ ## Validations after resuming the leader.
+ job_model = __get_job_model(expected_processors=processors.keys())
+ __validate_job_model(job_model)
+
+ leader.kill()
+ except:
+ ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+ logger.error(traceback.format_exc(sys.exc_info()))
+ raise
+ finally:
+ __tear_down_processors(processors)
+ __purge_zk_data()
+ __teardown_zk_client()
diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py
new file mode 100644
index 0000000000..2a11a80406
--- /dev/null
+++ b/samza-test/src/main/python/tests/zk_client.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from kazoo.client import KazooClient
+import logging
+import sys
+import traceback
+
+logger = logging.getLogger(__name__)
+
+class ZkClient:
+
+ """
+ Wrapper class over KazooClient. Provides utility methods for standalone failure tests to get details about
+ processor group state stored in zookeeper.
+
+ Instantiates a kazoo client to connect to zookeeper server at :param zookeeper_host::param zookeeper_port.
+ """
+ def __init__(self, zookeeper_host, zookeeper_port, app_name, app_id):
+ self.kazoo_client = KazooClient(hosts='{0}:{1}'.format(zookeeper_host, zookeeper_port))
+ self.zk_base_node = 'app-{0}-{1}/{2}-{3}-coordinationData'.format(app_name, app_id, app_name, app_id)
+
+ def start(self):
+ """
+ Establishes connection with the zookeeper server at self.host_name:self.port.
+ """
+ self.kazoo_client.start()
+
+ def stop(self):
+ """
+ Closes and releases the connection held with the zookeeper server.
+ """
+ self.kazoo_client.stop()
+
+ def watch_job_model(self, watch_function):
+ self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+ self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function)
+
+ def get_latest_job_model(self):
+ """
+ Reads and returns the latest JobModel from zookeeper.
+ """
+ job_model_dict = {}
+ try:
+ childZkNodes = self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+ if len(childZkNodes) > 0:
+ childZkNodes.sort()
+ childZkNodes.reverse()
+
+ job_model_generation_path = '{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0])
+ job_model, _ = self.kazoo_client.get(job_model_generation_path)
+
+ """
+ ZkClient java library stores the data in the following format in zookeeper:
+ class_name, data_length, actual_data
+
+ JobModel json manipulation: Delete all the characters before first occurrence of '{' in jobModel json string.
+
+ Normal json deserialization without the above custom string massaging fails. This will be removed after SAMZA-1749.
+ """
+
+ first_curly_brace_index = job_model.find('{')
+ job_model = job_model[first_curly_brace_index: ]
+ job_model_dict = json.loads(job_model)
+ logger.info("Recent JobModel in zookeeper: {0}".format(job_model_dict))
+ except:
+ logger.error(traceback.format_exc(sys.exc_info()))
+ return job_model_dict
+
+ def get_leader_processor_id(self):
+ """
+ Determines the processorId of the current leader in a processors group.
+
+ Returns the processorId of the leader if leader exists.
+ Returns None otherwise.
+ """
+ leader_processor_id = None
+ try:
+ processors_path = '{0}/processors'.format(self.zk_base_node)
+ childZkNodes = self.kazoo_client.get_children(processors_path)
+ childZkNodes.sort()
+ child_processor_path = '{0}/{1}'.format(processors_path, childZkNodes[0])
+ processor_data, _ = self.kazoo_client.get(child_processor_path)
+ host, leader_processor_id = processor_data.split(" ")
+ except:
+ logger.error(traceback.format_exc(sys.exc_info()))
+ return leader_processor_id
+
+ def purge_all_nodes(self):
+ """
+ Recursively delete all zookeeper nodes from the base node: self.zk_base_node.
+ """
+ try:
+ self.kazoo_client.delete(path=self.zk_base_node, version=-1, recursive=True)
+ except:
+ logger.error(traceback.format_exc(sys.exc_info()))
+
+ def get_active_processors(self):
+ """
+ Determines the processor ids that are active in zookeeper.
+ """
+ processor_ids = []
+ try:
+ processors_path = '{0}/processors'.format(self.zk_base_node)
+ childZkNodes = self.kazoo_client.get_children(processors_path)
+ childZkNodes.sort()
+
+ for childZkNode in childZkNodes:
+ child_processor_path = '{0}/{1}'.format(processors_path, childZkNode)
+ processor_data, _ = self.kazoo_client.get(child_processor_path)
+ host, processor_id = processor_data.split(" ")
+ processor_ids.append(processor_id)
+ except:
+ logger.error(traceback.format_exc(sys.exc_info()))
+ return processor_ids