From 8db964f11ada036fd59f1a2573f42e932e321a66 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Sat, 9 Jun 2018 16:51:47 -0700 Subject: [PATCH 1/5] Standalone failure tests. --- README.md | 8 +- bin/integration-tests.sh | 17 +- .../config/standalone.failure.test.properties | 45 +++ .../LocalApplicationRunnerMain.java | 63 ++++ .../PassThroughStreamApplication.java | 42 +++ samza-test/src/main/python/configs/kafka.json | 2 +- samza-test/src/main/python/deployment.py | 4 +- samza-test/src/main/python/requirements.txt | 1 + .../src/main/python/standalone_deployment.py | 126 +++++++ .../python/standalone_integration_tests.py | 29 ++ .../src/main/python/stream_processor.py | 121 +++++++ .../python/tests/standalone_failure_tests.py | 307 ++++++++++++++++++ samza-test/src/main/python/tests/zk_client.py | 130 ++++++++ 13 files changed, 885 insertions(+), 10 deletions(-) create mode 100644 samza-test/src/main/config/standalone.failure.test.properties create mode 100644 samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java create mode 100644 samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java create mode 100644 samza-test/src/main/python/standalone_deployment.py create mode 100644 samza-test/src/main/python/standalone_integration_tests.py create mode 100644 samza-test/src/main/python/stream_processor.py create mode 100644 samza-test/src/main/python/tests/standalone_failure_tests.py create mode 100644 samza-test/src/main/python/tests/zk_client.py diff --git a/README.md b/README.md index a431281d3a..1a7fdd9f90 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,13 @@ To run key-value performance tests: ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties -To run all integration tests: +To run yarn integration tests: - ./bin/integration-tests.sh + ./bin/integration-tests.sh yarn-integration-tests + +To run standalone integration tests: + + ./bin/integration-tests.sh standalone-integration-tests ### Running checkstyle on the java code ### diff --git a/bin/integration-tests.sh b/bin/integration-tests.sh index 14fcd1c030..248236fd34 100755 --- a/bin/integration-tests.sh +++ b/bin/integration-tests.sh @@ -19,6 +19,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" BASE_DIR=$DIR/.. TEST_DIR=$1 +FAILURE_TEST_TYPE=$2 if test -z "$TEST_DIR"; then echo @@ -70,17 +71,25 @@ source $SAMZA_INTEGRATION_TESTS_DIR/bin/activate # install zopkio and requests pip install -r $SCRIPTS_DIR/requirements.txt -# treat all trailing parameters (after dirname) as zopkio switches +# treat all trailing parameters (after dirname, test_type) as zopkio switches shift -SWITCHES="$*" +SWITCHES="${*:3}" # default to info-level debugging if not specified if [[ $SWITCHES != *"console-log-level"* ]]; then SWITCHES="$SWITCHES --console-log-level INFO" fi -# run the tests -zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py +if [[ ${FAILURE_TEST_TYPE} == "yarn-integration-tests" ]]; then + echo "Running yarn integration tests." + zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py +elif [[ ${FAILURE_TEST_TYPE} == "standalone-integration-tests" ]]; then + echo "Running standalone integration tests." + zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/standalone_integration_tests.py +else + echo "Invalid failure test type: $FAILURE_TEST_TYPE" + exit -1 +fi # go back to execution directory deactivate diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties new file mode 100644 index 0000000000..a5150c5316 --- /dev/null +++ b/samza-test/src/main/config/standalone.failure.test.properties @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +app.runner.class=org.apache.samza.runtime.LocalApplicationRunner + +app.class=org.apache.samza.test.integration.PassThroughStreamApplication + +app.name=test-app-name +app.id=test-app-id +job.name=test-app-name +job.id=test-app-id + +## Kafka I/O system properties. +task.inputs=standaloneIntegrationTestKafkaInputTopic +input.stream.name=standaloneIntegrationTestKafkaInputTopic +job.default.system=testSystemName +systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.testSystemName.producer.bootstrap.servers=localhost:9092 +systems.testSystemName.consumer.zookeeper.connect=localhost:2181 + +## Zookeeper coordination properties +job.coordinator.zk.connect=localhost:2181 +job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory +task.shutdown.ms=4000 +job.debounce.time.ms=4000 +job.coordinator.zk.consensus.timeout.ms=4000 +job.coordinator.zk.session.timeout.ms=4000 + +job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupByPartitionFactory +task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory \ No newline at end of file diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java new file mode 100644 index 0000000000..cb5521a2b2 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.integration; + +import joptsimple.OptionSet; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.runtime.ApplicationRunnerMain; +import org.apache.samza.runtime.ApplicationRunnerOperation; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG; + +/** + * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn + * and doesn't work for in standalone. + * + * This runner class is for standalone failure tests and not recommended for general use. + */ +public class LocalApplicationRunnerMain { + + private static final Logger LOGGER = LoggerFactory.getLogger(LocalApplicationRunnerMain.class); + + public static void main(String[] args) throws Exception { + ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine(); + OptionSet options = cmdLine.parser().parse(args); + Config orgConfig = cmdLine.loadConfig(options); + Config config = Util.rewriteConfig(orgConfig); + + ApplicationRunner runner = ApplicationRunner.fromConfig(config); + StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance(); + + ApplicationRunnerOperation op = cmdLine.getOperation(options); + + try { + LOGGER.info("Launching stream application: {} to run.", app); + runner.run(app); + runner.waitForFinish(); + } catch (Exception e) { + LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e); + } + } +} diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java new file mode 100644 index 0000000000..08113bc403 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.integration; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Acts as a pass through filter for all the events from a input stream. + */ +public class PassThroughStreamApplication implements StreamApplication { + + private static final Logger LOGGER = LoggerFactory.getLogger(PassThroughStreamApplication.class); + + @Override + public void init(StreamGraph graph, Config config) { + String inputStream = config.get("input.stream.name"); + String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic"; + LOGGER.info("Publishing message to: {}.", outputStreamName); + graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName)); + } +} diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json index 91fe23eaad..14b21372c4 100644 --- a/samza-test/src/main/python/configs/kafka.json +++ b/samza-test/src/main/python/configs/kafka.json @@ -3,7 +3,7 @@ "kafka_instance_0": "localhost" }, "kafka_port": 9092, - "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties", + "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties --override delete.topic.enable=true", "kafka_stop_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-stop.sh", "kafka_install_path": "deploy/kafka", "kafka_executable": "kafka_2.10-0.10.1.1.tgz", diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py index 89ba72891d..7cd3cacb4c 100644 --- a/samza-test/src/main/python/deployment.py +++ b/samza-test/src/main/python/deployment.py @@ -72,9 +72,7 @@ def setup_suite(): runtime.set_deployer(name, deployer) for instance, host in c(name + '_hosts').iteritems(): logger.info('Deploying {0} on host: {1}'.format(instance, host)) - deployer.deploy(instance, { - 'hostname': host - }) + deployer.start(instance, {'hostname': host}) # Setup Samza job deployer. samza_job_deployer = SamzaJobYarnDeployer({ diff --git a/samza-test/src/main/python/requirements.txt b/samza-test/src/main/python/requirements.txt index cf43a23620..e39538e445 100644 --- a/samza-test/src/main/python/requirements.txt +++ b/samza-test/src/main/python/requirements.txt @@ -19,3 +19,4 @@ zopkio==0.2.5 requests kafka-python==1.3.3 Jinja2 +kazoo==2.5 \ No newline at end of file diff --git a/samza-test/src/main/python/standalone_deployment.py b/samza-test/src/main/python/standalone_deployment.py new file mode 100644 index 0000000000..c7b25970db --- /dev/null +++ b/samza-test/src/main/python/standalone_deployment.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import zopkio.adhoc_deployer as adhoc_deployer +from zopkio.runtime import get_active_config as c +from subprocess import PIPE, Popen +import logging +import time +import urllib +import os + +TEST_INPUT_TOPIC = 'standaloneIntegrationTestKafkaInputTopic' +TEST_OUTPUT_TOPIC = 'standaloneIntegrationTestKafkaOutputTopic' + +logger = logging.getLogger(__name__) +deployers = {} + +def _download_components(components): + """ + Download the :param components if unavailable in deployment directory using url defined in config. + """ + + for component in components: + url_key = 'url_{0}'.format(component) + url = c(url_key) + filename = os.path.basename(url) + if os.path.exists(filename): + logger.debug('Using cached file: {0}.'.format(filename)) + else: + logger.info('Downloading {0} from {1}.'.format(component, url)) + urllib.urlretrieve(url, filename) + +def _deploy_components(components): + """ + Install and start all the :param components through binaries in deployment directory. + """ + + global deployers + + for component in components: + config = { + 'install_path': os.path.join(c('remote_install_path'), c(component + '_install_path')), + 'executable': c(component + '_executable'), + 'post_install_cmds': c(component + '_post_install_cmds', []), + 'start_command': c(component + '_start_cmd'), + 'stop_command': c(component + '_stop_cmd'), + 'extract': True, + 'sync': True, + } + deployer = adhoc_deployer.SSHDeployer(component, config) + deployers[component] = deployer + for instance, host in c(component + '_hosts').iteritems(): + logger.info('Deploying {0} on host: {1}'.format(instance, host)) + deployer.start(instance, {'hostname': host}) + time.sleep(5) + +def _create_kafka_topic(zookeeper_servers, topic_name, partition_count, replication_factor): + """ + :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector. + :param topic_name: name of kafka topic to create. + :param partition_count: Number of partitions of the kafka topic. + :param replication_factor: Replication factor of the kafka topic. + """ + + ### Using command line utility to create kafka topic since kafka python API doesn't support configuring partitionCount during topic creation. + base_dir = os.getcwd() + create_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --create --zookeeper {1} --replication-factor {2} --partitions {3} --topic {4}'.format(base_dir, zookeeper_servers, replication_factor, partition_count, topic_name) + p = Popen(create_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + logger.info("Output from create kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err)) + +def _delete_kafka_topic(zookeeper_servers, topic_name): + """ + Delete kafka topic defined by the method parameters. + + :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector. + :param topic_name: name of kafka topic to delete. + """ + + base_dir = os.getcwd() + delete_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --delete --zookeeper {1} --topic {2}'.format(base_dir, zookeeper_servers, topic_name) + logger.info("Deleting topic: {0}.".format(topic_name)) + p = Popen(delete_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err)) + +def setup_suite(): + """ + Setup method that will be run once by zopkio test_runner before all the integration tests. + """ + + ## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json. + _download_components(['zookeeper', 'kafka']) + + _deploy_components(['zookeeper', 'kafka']) + + ## Create input and output topics. + for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]: + logger.info("Creating topic: {0}.".format(topic)) + _create_kafka_topic('localhost:2181', topic, 3, 1) + +def teardown_suite(): + """ + Teardown method that will be run once by zopkio test_runner after all the integration tests. + """ + for component in ['kafka', 'zookeeper']: + deployer = deployers[component] + for instance, host in c(component + '_hosts').iteritems(): + deployer.undeploy(instance) + + for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]: + logger.info("Deleting topic: {0}.".format(topic)) + _delete_kafka_topic('localhost:2181', topic) diff --git a/samza-test/src/main/python/standalone_integration_tests.py b/samza-test/src/main/python/standalone_integration_tests.py new file mode 100644 index 0000000000..8405413613 --- /dev/null +++ b/samza-test/src/main/python/standalone_integration_tests.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# 'License'); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +dir = os.path.dirname(os.path.abspath(__file__)) + +test = { + 'deployment_code': os.path.join(dir, 'standalone_deployment.py'), + 'perf_code': os.path.join(dir, 'perf.py'), + 'configs_directory': os.path.join(dir, 'configs'), + 'test_code': [ + os.path.join(dir, 'tests', 'standalone_failure_tests.py'), + ], +} diff --git a/samza-test/src/main/python/stream_processor.py b/samza-test/src/main/python/stream_processor.py new file mode 100644 index 0000000000..a7a2bc8768 --- /dev/null +++ b/samza-test/src/main/python/stream_processor.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import os +import time +import zopkio.adhoc_deployer as adhoc_deployer +from zopkio.remote_host_helper import get_ssh_client, exec_with_env +import zopkio.runtime as runtime + +logger = logging.getLogger(__name__) + +class StreamProcessor: + """ + Represents a standalone StreamProcessor that uses zookeeper for coordination. Used in standalone failure tests to + to manage the lifecycle of linux process(start, kill, pause) associated with the StreamProcessor. + """ + + def __init__(self, host_name, processor_id): + """ + :param host_name: Represents the host name in which this StreamProcessor will run. + :param processor_id: Represents the processor_id of StreamProcessor. + """ + start_cmd = 'export SAMZA_LOG_DIR=\"deploy/{0}\"; export JAVA_OPTS=\"$JAVA_OPTS -Xmx2G\"; ./bin/run-class.sh org.apache.samza.test.integration.LocalApplicationRunnerMain --config-path ./config/standalone.failure.test.properties --operation run --config processor.id={0} >> /tmp/{0}.log &' + self.username = runtime.get_username() + self.password = runtime.get_password() + self.processor_id = processor_id + self.host_name = host_name + self.processor_start_command = start_cmd.format(self.processor_id) + logger.info('Running processor start command: {0}'.format(self.processor_start_command)) + self.deployment_config = { + 'install_path': os.path.join(runtime.get_active_config('remote_install_path'), 'deploy/{0}'.format(self.processor_id)), + 'executable': 'samza-test_2.11-0.15.0-SNAPSHOT.tgz', + 'post_install_cmds': [], + 'start_command': self.processor_start_command, + 'stop_command': '', + 'extract': True, + 'sync': True, + } + self.deployer = adhoc_deployer.SSHDeployer(self.processor_id, self.deployment_config) + + def start(self): + """ + Submits the StreamProcessor for execution on a host: host_name. + """ + logger.info("Starting processor with id: {0}.".format(self.processor_id)) + self.deployer.start(self.processor_id, {'hostname': self.host_name}) + + def get_processor_id(self): + """ + Returns the processorId of the StreamProcessor. + """ + return self.processor_id + + def kill(self): + """ + Kills the StreamProcessor process through SIGKILL signal. + """ + self.__send_signal_to_processor("SIGKILL") + + def pause(self): + """ + Pauses the StreamProcessor process through SIGSTOP signal. + """ + self.__send_signal_to_processor("SIGSTOP") + + def resume(self): + """ + Resumes the stream processor process through SIGCONT signal. + """ + self.__send_signal_to_processor("SIGCONT") + + def __send_signal_to_processor(self, signal): + """ + Sends a signal(:param signal) to the linux process of the StreamProcessor. + """ + linux_process_pids = self.__get_pid() + for linux_process_pid in linux_process_pids: + command = "kill -{0} {1}".format(signal, linux_process_pid) + result = self.__execute_command(command) + logger.info("Result of {0} is: {1}.".format(command, result)) + + def __get_pid(self): + """ + Determines the linux process id associated with this StreamProcessor. + """ + ps_command = "ps aux | grep '{0}' | grep -v grep | tr -s ' ' | cut -d ' ' -f 2 | grep -Eo '[0-9]+'".format(self.processor_id) + non_failing_command = "{0}; if [ $? -le 1 ]; then true; else false; fi;".format(ps_command) + logger.info("Executing command: {0}.".format(non_failing_command)) + full_output = self.__execute_command(non_failing_command) + pids = [] + if len(full_output) > 0: + pids = [int(pid_str) for pid_str in full_output.split('\n') if pid_str.isdigit()] + return pids + + def __execute_command(self, command): + """ + Executes the :param command on host: self.host_name. + """ + with get_ssh_client(self.host_name, username=self.username, password=self.password) as ssh: + chan = exec_with_env(ssh, command, msg="Failed to get PID", env={}) + execution_result = '' + while True: + result_buffer = chan.recv(16) + if len(result_buffer) == 0: + break + execution_result += result_buffer + return execution_result diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py new file mode 100644 index 0000000000..2b412240db --- /dev/null +++ b/samza-test/src/main/python/tests/standalone_failure_tests.py @@ -0,0 +1,307 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import util +import sys +import logging +from kafka import SimpleProducer, SimpleConsumer +import time +import traceback +from stream_processor import StreamProcessor +from zk_client import ZkClient +import threading + +logger = logging.getLogger(__name__) +NUM_MESSAGES = 50 +GROUP_COORDINATION_TIMEOUT = 14 +TEST_OUTPUT_TOPIC = 'standaloneIntegrationTestKafkaOutputTopic' +zk_client = None + +### TODO: In each test add barrier state and processorId validations after fixing data serialization format in zookeeper(SAMZA-1749). +def __purge_zk_data(): + """ + Recursively deletes all data nodes created in zookeeper in a test-run. + """ + zk_client.purge_all_nodes() + +def __pump_messages_into_input_topic(): + """ + Produce 50 messages into input topic: standaloneIntegrationTestKafkaInputTopic. + """ + kafka_client = None + input_topic = 'standaloneIntegrationTestKafkaInputTopic' + try: + kafka_client = util.get_kafka_client() + kafka_client.ensure_topic_exists(input_topic) + producer = SimpleProducer(kafka_client, async=False, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, ack_timeout=30000) + logger.info('Producing {0} messages to topic: {1}'.format(NUM_MESSAGES, input_topic)) + for message_index in range(1, NUM_MESSAGES + 1): + producer.send_messages(input_topic, str(message_index)) + except: + logger.error(traceback.format_exc(sys.exc_info())) + finally: + if kafka_client is not None: + kafka_client.close() + +def __setup_processors(): + """ + Instantiates and schedules three stream processors for execution in localhost. + :return the instantiated stream processors. + """ + processors = {} + for processor_id in ['standalone-processor-1', 'standalone-processor-2', 'standalone-processor-3']: + processors[processor_id] = StreamProcessor(host_name='localhost', processor_id=processor_id) + processors[processor_id].start() + return processors + +def __tear_down_processors(processors): + """ + Kills all the stream processor passed in :param processors. + """ + for processor_id, processor in processors.iteritems(): + logger.info("Killing processor: {0}.".format(processor_id)) + processor.kill() + +def __setup_zk_client(): + """ + Instantiate a ZkClient to connect to a zookeeper server in localhost. + """ + global zk_client + zk_client = ZkClient(zookeeper_host='127.0.0.1', zookeeper_port='2181', app_name='test-app-name', app_id='test-app-id') + zk_client.start() + +def __teardown_zk_client(): + """ + Stops the ZkClient. + """ + global zk_client + zk_client.stop() + +def job_model_watch(event, expected_processors): + start_time_seconds = time.time() + elapsed_time_seconds = (int)(time.time() - start_time_seconds) + while elapsed_time_seconds <= 30: + recent_job_model = zk_client.get_latest_job_model() + if recent_job_model['containers'].keys() == expected_processors: + event.set() + return + else: + time.sleep(2) + elapsed_time_seconds = (int)(time.time() - start_time_seconds) + + +def test_kill_leader(): + """ + Launches three stream processors. Kills the leader processor. Waits till the group coordination timeout + and verifies that the final JobModel contains both the followers. + """ + processors = {} + try: + __setup_zk_client() + __pump_messages_into_input_topic() + processors = __setup_processors() + + leader_processor_id = zk_client.get_leader_processor_id() + processors.pop(leader_processor_id).kill() + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) + + event.wait(2 * GROUP_COORDINATION_TIMEOUT) + + job_model = zk_client.get_latest_job_model() + for processor_id, deployer in processors.iteritems(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + assert leader_processor_id not in job_model['containers'], 'Leader processor: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model) + except: + ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. + logger.error(traceback.format_exc(sys.exc_info())) + raise + finally: + __tear_down_processors(processors) + __purge_zk_data() + __teardown_zk_client() + +def test_kill_one_follower(): + """ + Launches three stream processors. Kills one follower processor. Waits till the group coordination timeout and + verifies that the final JobModel contains the leader processor and un-killed follower processor. + """ + processors = {} + try: + __setup_zk_client() + __pump_messages_into_input_topic() + processors = __setup_processors() + + leader_processor_id = zk_client.get_leader_processor_id() + for processor_id, deployer in processors.iteritems(): + if processor_id != leader_processor_id: + processors.pop(processor_id).kill() + break + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + job_model = zk_client.get_latest_job_model() + for processor_id, deployer in processors.iteritems(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + except: + ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. + logger.error(traceback.format_exc(sys.exc_info())) + raise + finally: + __tear_down_processors(processors) + __purge_zk_data() + __teardown_zk_client() + +def test_kill_multiple_followers(): + """ + Launches three stream processors. Kills both the follower processors. Waits for group coordination timeout + and verifies that the final JobModel contains only the leader processor. + """ + processors = {} + try: + __setup_zk_client() + __pump_messages_into_input_topic() + processors = __setup_processors() + + leader_processor_id = zk_client.get_leader_processor_id() + for processor_id in processors.keys(): + if processor_id != leader_processor_id: + follower = processors.pop(processor_id) + follower.kill() + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=[leader_processor_id])) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + ## Verifications after killing the processors. + job_model = zk_client.get_latest_job_model() + + assert leader_processor_id in job_model['containers'], 'Leader processor: {0} does not exist in JobModel: {1}.'.format(leader_processor_id, job_model) + except: + ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. + logger.error(traceback.format_exc(sys.exc_info())) + raise + finally: + __tear_down_processors(processors) + __purge_zk_data() + __teardown_zk_client() + +def test_kill_leader_and_a_follower(): + """ + Launches three stream processors. Kills both a leader and a follower processors. + Waits till the group coordination timeout and verifies that the final JobModel contains only one processor. + """ + processors = {} + try: + __setup_zk_client() + __pump_messages_into_input_topic() + processors = __setup_processors() + + leader_processor_id = zk_client.get_leader_processor_id() + processors.pop(leader_processor_id).kill() + + for processor_id in processors.keys(): + processors.pop(processor_id).kill() + break + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + ## Verifications after killing the processors. + job_model = zk_client.get_latest_job_model() + for processor_id in processors.keys(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + assert leader_processor_id not in job_model['containers'], 'Leader processor id: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model) + except: + ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. + logger.error(traceback.format_exc(sys.exc_info())) + raise + finally: + __tear_down_processors(processors) + __purge_zk_data() + __teardown_zk_client() + +def test_pause_resume_leader(): + """ + Launches three processors. Pauses the leader processor. Wait till group coordination timeout and verifies that the + JobModel doesn't contain leader processor. Resumes the leader processor and waits till group coordination timeout, + verifies that new JobModel contains the leader processor. + """ + processors = {} + try: + __setup_zk_client() + __pump_messages_into_input_topic() + processors = __setup_processors() + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + ## First JobModel generation. + job_model = zk_client.get_latest_job_model() + for processor_id, deployer in processors.iteritems(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + + leader_processor_id = zk_client.get_leader_processor_id() + leader = processors.pop(leader_processor_id) + + logger.info("Pausing the leader processor: {0}.".format(leader_processor_id)) + leader.pause() + + event = threading.Event() + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + ## Verifications after leader was suspended. + job_model = zk_client.get_latest_job_model() + for processor_id, deployer in processors.iteritems(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers']) + + logger.info("Resuming the leader processor: {0}.".format(leader_processor_id)) + leader.resume() + + event = threading.Event() + expected_processors = processors.keys() + expected_processors.append(leader_processor_id) + zk_client.watch_job_model(job_model_watch(event = event, expected_processors=expected_processors)) + + event.wait(GROUP_COORDINATION_TIMEOUT * 2) + + job_model = zk_client.get_latest_job_model() + + ## Verifications after leader was resumed. + assert leader_processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(leader_processor_id, job_model['containers']) + for processor_id, deployer in processors.iteritems(): + assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers']) + + leader.kill() + except: + ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. + logger.error(traceback.format_exc(sys.exc_info())) + raise + finally: + __tear_down_processors(processors) + __purge_zk_data() + __teardown_zk_client() diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py new file mode 100644 index 0000000000..f5d17980ad --- /dev/null +++ b/samza-test/src/main/python/tests/zk_client.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +from kazoo.client import KazooClient +import logging +import sys +import traceback + +logger = logging.getLogger(__name__) + +class ZkClient: + + """ + Wrapper class over KazooClient. Provides utility methods for standalone failure tests to get details about + processor group state stored in zookeeper. + + Instantiates a kazoo client to connect to zookeeper server at :param zookeeper_host::param zookeeper_port. + """ + def __init__(self, zookeeper_host, zookeeper_port, app_name, app_id): + self.kazoo_client = KazooClient(hosts='{0}:{1}'.format(zookeeper_host, zookeeper_port)) + self.zk_base_node = 'app-{0}-{1}/{2}-{3}-coordinationData'.format(app_name, app_id, app_name, app_id) + + def start(self): + """ + Establishes connection with the zookeeper server at self.host_name:self.port. + """ + self.kazoo_client.start() + + def stop(self): + """ + Closes and releases the connection held with the zookeeper server. + """ + self.kazoo_client.stop() + + def watch_job_model(self, watch_function): + self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node)) + self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function) + + def get_latest_job_model(self): + """ + Reads and returns the latest JobModel from zookeeper. + """ + job_model_dict = {} + try: + childZkNodes = self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node)) + if len(childZkNodes) > 0: + childZkNodes.sort() + childZkNodes.reverse() + + job_model_generation_path = '{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0]) + job_model, _ = self.kazoo_client.get(job_model_generation_path) + + """ + ZkClient java library stores the data in the following format in zookeeper: + class_name, data_length, actual_data + + JobModel json manipulation: Delete all the characters before first occurrence of '{' in jobModel json string. + + Normal json deserialization without the above custom string massaging fails. This will be removed after SAMZA-1749. + """ + + first_curly_brace_index = job_model.find('{') + job_model = job_model[first_curly_brace_index: ] + job_model_dict = json.loads(job_model) + logger.info("Recent JobModel in zookeeper: {0}".format(job_model_dict)) + except: + logger.error(traceback.format_exc(sys.exc_info())) + return job_model_dict + + def get_leader_processor_id(self): + """ + Determines the processorId of the current leader in a processors group. + + Returns the processorId of the leader if leader exists. + Returns None otherwise. + """ + leader_processor_id = None + try: + processors_path = '{0}/processors'.format(self.zk_base_node) + childZkNodes = self.kazoo_client.get_children(processors_path) + childZkNodes.sort() + child_processor_path = '{0}/{1}'.format(processors_path, childZkNodes[0]) + processor_data, _ = self.kazoo_client.get(child_processor_path) + host, leader_processor_id = processor_data.split(" ") + except: + logger.error(traceback.format_exc(sys.exc_info())) + return leader_processor_id + + def purge_all_nodes(self): + """ + Recursively delete all zookeeper nodes from the base node: self.zk_base_node. + """ + try: + self.kazoo_client.delete(path=self.zk_base_node, version=-1, recursive=True) + except: + logger.error(traceback.format_exc(sys.exc_info())) + + def get_all_ephemeral_processors(self): + """ + Determines the processor ids that are active in zookeeper. + """ + processor_ids = [] + try: + processors_path = '{0}/processors'.format(self.zk_base_node) + childZkNodes = self.kazoo_client.get_children(processors_path) + childZkNodes.sort() + + for childZkNode in childZkNodes: + child_processor_path = '{0}/{1}'.format(processors_path, childZkNode) + processor_data, _ = self.kazoo_client.get(child_processor_path) + host, processor_id = processor_data.split(" ") + processor_ids.append(processor_id) + except: + logger.error(traceback.format_exc(sys.exc_info())) + + return processor_ids From 05b08f235f7a6e78e48bf7b050c47fd5ff6bdd9c Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Thu, 14 Jun 2018 19:46:10 -0700 Subject: [PATCH 2/5] Minor doc changes. --- .../samza/test/integration/LocalApplicationRunnerMain.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java index cb5521a2b2..e8be592e03 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java @@ -35,7 +35,7 @@ * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn * and doesn't work for in standalone. * - * This runner class is for standalone failure tests and not recommended for general use. + * This runner class is built for standalone failure tests and not recommended for general use. */ public class LocalApplicationRunnerMain { From a0f6300c9a9a797aa6bbe9003ccf19aaf8d63132 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Thu, 14 Jun 2018 19:58:21 -0700 Subject: [PATCH 3/5] Minor comment changes. --- .../main/python/tests/standalone_failure_tests.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py index 2b412240db..1cf2b7d0d2 100644 --- a/samza-test/src/main/python/tests/standalone_failure_tests.py +++ b/samza-test/src/main/python/tests/standalone_failure_tests.py @@ -94,14 +94,13 @@ def job_model_watch(event, expected_processors): start_time_seconds = time.time() elapsed_time_seconds = (int)(time.time() - start_time_seconds) while elapsed_time_seconds <= 30: - recent_job_model = zk_client.get_latest_job_model() - if recent_job_model['containers'].keys() == expected_processors: - event.set() - return - else: - time.sleep(2) - elapsed_time_seconds = (int)(time.time() - start_time_seconds) - + recent_job_model = zk_client.get_latest_job_model() + if recent_job_model['containers'].keys() == expected_processors: + event.set() + return + else: + time.sleep(2) + elapsed_time_seconds = (int)(time.time() - start_time_seconds) def test_kill_leader(): """ From b703fb3f41a1c2d02114201c84b0e8de36b90d7c Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Fri, 15 Jun 2018 09:52:50 -0700 Subject: [PATCH 4/5] Minor python doc cleanup. --- samza-test/src/main/python/tests/standalone_failure_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py index 1cf2b7d0d2..9a4edc368d 100644 --- a/samza-test/src/main/python/tests/standalone_failure_tests.py +++ b/samza-test/src/main/python/tests/standalone_failure_tests.py @@ -244,7 +244,7 @@ def test_pause_resume_leader(): """ Launches three processors. Pauses the leader processor. Wait till group coordination timeout and verifies that the JobModel doesn't contain leader processor. Resumes the leader processor and waits till group coordination timeout, - verifies that new JobModel contains the leader processor. + verifies that new JobModel contains the previously paused leader processor. """ processors = {} try: From 61653e4de8ab58475f124763bb81f28d59322774 Mon Sep 17 00:00:00 2001 From: Shanthoosh Venkataraman Date: Thu, 21 Jun 2018 14:46:58 -0700 Subject: [PATCH 5/5] Review comments. --- .../config/standalone.failure.test.properties | 6 +- ...TestStandaloneIntegrationApplication.java} | 4 +- .../src/main/python/standalone_deployment.py | 39 ++--- .../python/tests/standalone_failure_tests.py | 161 +++++++++--------- samza-test/src/main/python/tests/zk_client.py | 3 +- 5 files changed, 107 insertions(+), 106 deletions(-) rename samza-test/src/main/java/org/apache/samza/test/integration/{PassThroughStreamApplication.java => TestStandaloneIntegrationApplication.java} (88%) diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties index a5150c5316..d855d5f020 100644 --- a/samza-test/src/main/config/standalone.failure.test.properties +++ b/samza-test/src/main/config/standalone.failure.test.properties @@ -18,7 +18,7 @@ app.runner.class=org.apache.samza.runtime.LocalApplicationRunner -app.class=org.apache.samza.test.integration.PassThroughStreamApplication +app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication app.name=test-app-name app.id=test-app-id @@ -26,8 +26,8 @@ job.name=test-app-name job.id=test-app-id ## Kafka I/O system properties. -task.inputs=standaloneIntegrationTestKafkaInputTopic -input.stream.name=standaloneIntegrationTestKafkaInputTopic +task.inputs=standalone_integration_test_kafka_input_topic +input.stream.name=standalone_integration_test_kafka_input_topic job.default.system=testSystemName systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.testSystemName.producer.bootstrap.servers=localhost:9092 diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java similarity index 88% rename from samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java rename to samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java index 08113bc403..f6e3d5fe6e 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/PassThroughStreamApplication.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java @@ -28,9 +28,9 @@ /** * Acts as a pass through filter for all the events from a input stream. */ -public class PassThroughStreamApplication implements StreamApplication { +public class TestStandaloneIntegrationApplication implements StreamApplication { - private static final Logger LOGGER = LoggerFactory.getLogger(PassThroughStreamApplication.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class); @Override public void init(StreamGraph graph, Config config) { diff --git a/samza-test/src/main/python/standalone_deployment.py b/samza-test/src/main/python/standalone_deployment.py index c7b25970db..cd038efb1c 100644 --- a/samza-test/src/main/python/standalone_deployment.py +++ b/samza-test/src/main/python/standalone_deployment.py @@ -22,12 +22,28 @@ import urllib import os -TEST_INPUT_TOPIC = 'standaloneIntegrationTestKafkaInputTopic' -TEST_OUTPUT_TOPIC = 'standaloneIntegrationTestKafkaOutputTopic' +TEST_INPUT_TOPIC = 'standalone_integration_test_kafka_input_topic' +TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic' logger = logging.getLogger(__name__) deployers = {} +def setup_suite(): + """ + Setup method that will be run once by zopkio test_runner before all the integration tests. + """ + ## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json. + _download_components(['zookeeper', 'kafka']) + + _deploy_components(['zookeeper', 'kafka']) + + ## Create input and output topics. + for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]: + logger.info("Deleting topic: {0}.".format(topic)) + _delete_kafka_topic('localhost:2181', topic) + logger.info("Creating topic: {0}.".format(topic)) + _create_kafka_topic('localhost:2181', topic, 3, 1) + def _download_components(components): """ Download the :param components if unavailable in deployment directory using url defined in config. @@ -97,21 +113,6 @@ def _delete_kafka_topic(zookeeper_servers, topic_name): output, err = p.communicate() logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err)) -def setup_suite(): - """ - Setup method that will be run once by zopkio test_runner before all the integration tests. - """ - - ## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json. - _download_components(['zookeeper', 'kafka']) - - _deploy_components(['zookeeper', 'kafka']) - - ## Create input and output topics. - for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]: - logger.info("Creating topic: {0}.".format(topic)) - _create_kafka_topic('localhost:2181', topic, 3, 1) - def teardown_suite(): """ Teardown method that will be run once by zopkio test_runner after all the integration tests. @@ -120,7 +121,3 @@ def teardown_suite(): deployer = deployers[component] for instance, host in c(component + '_hosts').iteritems(): deployer.undeploy(instance) - - for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]: - logger.info("Deleting topic: {0}.".format(topic)) - _delete_kafka_topic('localhost:2181', topic) diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py index 9a4edc368d..0fab7421bf 100644 --- a/samza-test/src/main/python/tests/standalone_failure_tests.py +++ b/samza-test/src/main/python/tests/standalone_failure_tests.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) NUM_MESSAGES = 50 GROUP_COORDINATION_TIMEOUT = 14 -TEST_OUTPUT_TOPIC = 'standaloneIntegrationTestKafkaOutputTopic' +TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic' zk_client = None ### TODO: In each test add barrier state and processorId validations after fixing data serialization format in zookeeper(SAMZA-1749). @@ -39,10 +39,10 @@ def __purge_zk_data(): def __pump_messages_into_input_topic(): """ - Produce 50 messages into input topic: standaloneIntegrationTestKafkaInputTopic. + Produce 50 messages into input topic: standalone_integration_test_kafka_input_topic. """ kafka_client = None - input_topic = 'standaloneIntegrationTestKafkaInputTopic' + input_topic = 'standalone_integration_test_kafka_input_topic' try: kafka_client = util.get_kafka_client() kafka_client.ensure_topic_exists(input_topic) @@ -90,18 +90,42 @@ def __teardown_zk_client(): global zk_client zk_client.stop() -def job_model_watch(event, expected_processors): +def job_model_watcher(event, expected_processors): start_time_seconds = time.time() elapsed_time_seconds = (int)(time.time() - start_time_seconds) while elapsed_time_seconds <= 30: recent_job_model = zk_client.get_latest_job_model() - if recent_job_model['containers'].keys() == expected_processors: + if set(recent_job_model['containers'].keys()) == set(expected_processors): event.set() - return + return else: - time.sleep(2) + time.sleep(2) elapsed_time_seconds = (int)(time.time() - start_time_seconds) +def __validate_job_model(job_model, killed_processors=[]): + ## Validate the TaskModel. Check if all the partitions are assigned to the containers. + expected_ssps = [{u'partition': 0, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}, + {u'partition': 1, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}, + {u'partition': 2, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}] + actual_ssps = [] + for container_id, tasks in job_model['containers'].iteritems(): + for partition, ssps in tasks['tasks'].iteritems(): + actual_ssps.append(ssps['system-stream-partitions'][0]) + actual_ssps.sort() + assert expected_ssps == actual_ssps, 'Expected ssp: {0}, Actual ssp: {1}.'.format(expected_ssps, actual_ssps) + + ## Validate the ContainerModel. Live processors should be present in the JobModel and killed processors should not be in JobModel. + active_processors = zk_client.get_active_processors() + assert set(active_processors) == set(job_model['containers'].keys()), 'ProcessorIds: {0} does not exist in JobModel: {1}.'.format(active_processors, job_model['containers'].keys()) + for processor_id in killed_processors: + assert processor_id not in job_model['containers'], 'Processor: {0} exists in JobModel: {1}.'.format(processor_id, job_model) + +def __get_job_model(expected_processors): + event = threading.Event() + zk_client.watch_job_model(job_model_watcher(event=event, expected_processors=expected_processors)) + event.wait(2 * GROUP_COORDINATION_TIMEOUT) + return zk_client.get_latest_job_model() + def test_kill_leader(): """ Launches three stream processors. Kills the leader processor. Waits till the group coordination timeout @@ -113,18 +137,17 @@ def test_kill_leader(): __pump_messages_into_input_topic() processors = __setup_processors() + ## Validations before killing the leader. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model, []) + leader_processor_id = zk_client.get_leader_processor_id() processors.pop(leader_processor_id).kill() - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) - - event.wait(2 * GROUP_COORDINATION_TIMEOUT) - - job_model = zk_client.get_latest_job_model() - for processor_id, deployer in processors.iteritems(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) - assert leader_processor_id not in job_model['containers'], 'Leader processor: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model) + ## Validations after killing the leader. + job_model = __get_job_model(expected_processors=processors.keys()) + assert leader_processor_id != zk_client.get_leader_processor_id(), '{0} is still the leader'.format(leader_processor_id) + __validate_job_model(job_model, [leader_processor_id]) except: ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. logger.error(traceback.format_exc(sys.exc_info())) @@ -145,20 +168,23 @@ def test_kill_one_follower(): __pump_messages_into_input_topic() processors = __setup_processors() - leader_processor_id = zk_client.get_leader_processor_id() + leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), [] + + ## Validations before killing the follower. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model) + for processor_id, deployer in processors.iteritems(): if processor_id != leader_processor_id: - processors.pop(processor_id).kill() + follower = processors.pop(processor_id) + follower.kill() + killed_processors.append(follower) break - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - job_model = zk_client.get_latest_job_model() - for processor_id, deployer in processors.iteritems(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + ## Validations after killing the follower. + job_model = __get_job_model(expected_processors=processors.keys()) + assert leader_processor_id == zk_client.get_leader_processor_id(), '{0} is not the leader'.format(leader_processor_id) + __validate_job_model(job_model, killed_processors) except: ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. logger.error(traceback.format_exc(sys.exc_info())) @@ -179,21 +205,21 @@ def test_kill_multiple_followers(): __pump_messages_into_input_topic() processors = __setup_processors() - leader_processor_id = zk_client.get_leader_processor_id() + ## Validations before killing the followers. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model) + + leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), [] + for processor_id in processors.keys(): if processor_id != leader_processor_id: follower = processors.pop(processor_id) + killed_processors.append(follower) follower.kill() - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=[leader_processor_id])) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - ## Verifications after killing the processors. - job_model = zk_client.get_latest_job_model() - - assert leader_processor_id in job_model['containers'], 'Leader processor: {0} does not exist in JobModel: {1}.'.format(leader_processor_id, job_model) + ## Validations after killing the followers. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model, killed_processors) except: ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. logger.error(traceback.format_exc(sys.exc_info())) @@ -215,22 +241,23 @@ def test_kill_leader_and_a_follower(): processors = __setup_processors() leader_processor_id = zk_client.get_leader_processor_id() + + ## Validations before killing the leader and follower. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model) + + killed_processors = [leader_processor_id] processors.pop(leader_processor_id).kill() for processor_id in processors.keys(): - processors.pop(processor_id).kill() + follower = processors.pop(processor_id) + killed_processors.append(processor_id) + follower.kill() break - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - ## Verifications after killing the processors. - job_model = zk_client.get_latest_job_model() - for processor_id in processors.keys(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) - assert leader_processor_id not in job_model['containers'], 'Leader processor id: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model) + ## Validations after killing the leader and follower. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model, killed_processors) except: ## Explicitly logging exception, since zopkio doesn't log complete stacktrace. logger.error(traceback.format_exc(sys.exc_info())) @@ -252,15 +279,9 @@ def test_pause_resume_leader(): __pump_messages_into_input_topic() processors = __setup_processors() - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - ## First JobModel generation. - job_model = zk_client.get_latest_job_model() - for processor_id, deployer in processors.iteritems(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model) + ## Validations before pausing the leader. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model) leader_processor_id = zk_client.get_leader_processor_id() leader = processors.pop(leader_processor_id) @@ -268,32 +289,16 @@ def test_pause_resume_leader(): logger.info("Pausing the leader processor: {0}.".format(leader_processor_id)) leader.pause() - event = threading.Event() - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys())) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - ## Verifications after leader was suspended. - job_model = zk_client.get_latest_job_model() - for processor_id, deployer in processors.iteritems(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers']) + ## Validations after pausing the leader. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model, [leader_processor_id]) logger.info("Resuming the leader processor: {0}.".format(leader_processor_id)) leader.resume() - event = threading.Event() - expected_processors = processors.keys() - expected_processors.append(leader_processor_id) - zk_client.watch_job_model(job_model_watch(event = event, expected_processors=expected_processors)) - - event.wait(GROUP_COORDINATION_TIMEOUT * 2) - - job_model = zk_client.get_latest_job_model() - - ## Verifications after leader was resumed. - assert leader_processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(leader_processor_id, job_model['containers']) - for processor_id, deployer in processors.iteritems(): - assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers']) + ## Validations after resuming the leader. + job_model = __get_job_model(expected_processors=processors.keys()) + __validate_job_model(job_model) leader.kill() except: diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py index f5d17980ad..2a11a80406 100644 --- a/samza-test/src/main/python/tests/zk_client.py +++ b/samza-test/src/main/python/tests/zk_client.py @@ -109,7 +109,7 @@ def purge_all_nodes(self): except: logger.error(traceback.format_exc(sys.exc_info())) - def get_all_ephemeral_processors(self): + def get_active_processors(self): """ Determines the processor ids that are active in zookeeper. """ @@ -126,5 +126,4 @@ def get_all_ephemeral_processors(self): processor_ids.append(processor_id) except: logger.error(traceback.format_exc(sys.exc_info())) - return processor_ids