Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ To run key-value performance tests:

./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties

To run all integration tests:
To run yarn integration tests:

./bin/integration-tests.sh <dir>
./bin/integration-tests.sh <dir> yarn-integration-tests

To run standalone integration tests:

./bin/integration-tests.sh <dir> standalone-integration-tests

### Running checkstyle on the java code ###

Expand Down
17 changes: 13 additions & 4 deletions bin/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=$DIR/..
TEST_DIR=$1
FAILURE_TEST_TYPE=$2

if test -z "$TEST_DIR"; then
echo
Expand Down Expand Up @@ -70,17 +71,25 @@ source $SAMZA_INTEGRATION_TESTS_DIR/bin/activate
# install zopkio and requests
pip install -r $SCRIPTS_DIR/requirements.txt

# treat all trailing parameters (after dirname) as zopkio switches
# treat all trailing parameters (after dirname, test_type) as zopkio switches
shift
SWITCHES="$*"
SWITCHES="${*:3}"

# default to info-level debugging if not specified
if [[ $SWITCHES != *"console-log-level"* ]]; then
SWITCHES="$SWITCHES --console-log-level INFO"
fi

# run the tests
zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
if [[ ${FAILURE_TEST_TYPE} == "yarn-integration-tests" ]]; then
echo "Running yarn integration tests."
zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
elif [[ ${FAILURE_TEST_TYPE} == "standalone-integration-tests" ]]; then
echo "Running standalone integration tests."
zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/standalone_integration_tests.py
else
echo "Invalid failure test type: $FAILURE_TEST_TYPE"
exit -1
fi

# go back to execution directory
deactivate
Expand Down
45 changes: 45 additions & 0 deletions samza-test/src/main/config/standalone.failure.test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

app.runner.class=org.apache.samza.runtime.LocalApplicationRunner

app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication

app.name=test-app-name
app.id=test-app-id
job.name=test-app-name
job.id=test-app-id

## Kafka I/O system properties.
task.inputs=standalone_integration_test_kafka_input_topic
input.stream.name=standalone_integration_test_kafka_input_topic
job.default.system=testSystemName
systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.testSystemName.producer.bootstrap.servers=localhost:9092
systems.testSystemName.consumer.zookeeper.connect=localhost:2181

## Zookeeper coordination properties
job.coordinator.zk.connect=localhost:2181
job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
task.shutdown.ms=4000
job.debounce.time.ms=4000
job.coordinator.zk.consensus.timeout.ms=4000
job.coordinator.zk.session.timeout.ms=4000

job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupByPartitionFactory
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.test.integration;

import joptsimple.OptionSet;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunnerMain;
import org.apache.samza.runtime.ApplicationRunnerOperation;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG;

/**
* {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn
* and doesn't work for in standalone.
*
* This runner class is built for standalone failure tests and not recommended for general use.
*/
public class LocalApplicationRunnerMain {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be specific about the application name and add test to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private static final Logger LOGGER = LoggerFactory.getLogger(LocalApplicationRunnerMain.class);

public static void main(String[] args) throws Exception {
ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
OptionSet options = cmdLine.parser().parse(args);
Config orgConfig = cmdLine.loadConfig(options);
Config config = Util.rewriteConfig(orgConfig);

ApplicationRunner runner = ApplicationRunner.fromConfig(config);
StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();

ApplicationRunnerOperation op = cmdLine.getOperation(options);

try {
LOGGER.info("Launching stream application: {} to run.", app);
runner.run(app);
runner.waitForFinish();
} catch (Exception e) {
LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.integration;

import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Acts as a pass through filter for all the events from a input stream.
*/
public class TestStandaloneIntegrationApplication implements StreamApplication {

private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);

@Override
public void init(StreamGraph graph, Config config) {
String inputStream = config.get("input.stream.name");
String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
LOGGER.info("Publishing message to: {}.", outputStreamName);
graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName));
}
}
2 changes: 1 addition & 1 deletion samza-test/src/main/python/configs/kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"kafka_instance_0": "localhost"
},
"kafka_port": 9092,
"kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties",
"kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties --override delete.topic.enable=true",
"kafka_stop_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-stop.sh",
"kafka_install_path": "deploy/kafka",
"kafka_executable": "kafka_2.10-0.10.1.1.tgz",
Expand Down
4 changes: 1 addition & 3 deletions samza-test/src/main/python/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def setup_suite():
runtime.set_deployer(name, deployer)
for instance, host in c(name + '_hosts').iteritems():
logger.info('Deploying {0} on host: {1}'.format(instance, host))
deployer.deploy(instance, {
'hostname': host
})
deployer.start(instance, {'hostname': host})

# Setup Samza job deployer.
samza_job_deployer = SamzaJobYarnDeployer({
Expand Down
1 change: 1 addition & 0 deletions samza-test/src/main/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ zopkio==0.2.5
requests
kafka-python==1.3.3
Jinja2
kazoo==2.5
123 changes: 123 additions & 0 deletions samza-test/src/main/python/standalone_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import zopkio.adhoc_deployer as adhoc_deployer
from zopkio.runtime import get_active_config as c
from subprocess import PIPE, Popen
import logging
import time
import urllib
import os

TEST_INPUT_TOPIC = 'standalone_integration_test_kafka_input_topic'
TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'

logger = logging.getLogger(__name__)
deployers = {}

def setup_suite():
"""
Setup method that will be run once by zopkio test_runner before all the integration tests.
"""
## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json.
_download_components(['zookeeper', 'kafka'])

_deploy_components(['zookeeper', 'kafka'])

## Create input and output topics.
for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]:
logger.info("Deleting topic: {0}.".format(topic))
_delete_kafka_topic('localhost:2181', topic)
logger.info("Creating topic: {0}.".format(topic))
_create_kafka_topic('localhost:2181', topic, 3, 1)

def _download_components(components):
"""
Download the :param components if unavailable in deployment directory using url defined in config.
"""

for component in components:
url_key = 'url_{0}'.format(component)
url = c(url_key)
filename = os.path.basename(url)
if os.path.exists(filename):
logger.debug('Using cached file: {0}.'.format(filename))
else:
logger.info('Downloading {0} from {1}.'.format(component, url))
urllib.urlretrieve(url, filename)

def _deploy_components(components):
"""
Install and start all the :param components through binaries in deployment directory.
"""

global deployers

for component in components:
config = {
'install_path': os.path.join(c('remote_install_path'), c(component + '_install_path')),
'executable': c(component + '_executable'),
'post_install_cmds': c(component + '_post_install_cmds', []),
'start_command': c(component + '_start_cmd'),
'stop_command': c(component + '_stop_cmd'),
'extract': True,
'sync': True,
}
deployer = adhoc_deployer.SSHDeployer(component, config)
deployers[component] = deployer
for instance, host in c(component + '_hosts').iteritems():
logger.info('Deploying {0} on host: {1}'.format(instance, host))
deployer.start(instance, {'hostname': host})
time.sleep(5)

def _create_kafka_topic(zookeeper_servers, topic_name, partition_count, replication_factor):
"""
:param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
:param topic_name: name of kafka topic to create.
:param partition_count: Number of partitions of the kafka topic.
:param replication_factor: Replication factor of the kafka topic.
"""

### Using command line utility to create kafka topic since kafka python API doesn't support configuring partitionCount during topic creation.
base_dir = os.getcwd()
create_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --create --zookeeper {1} --replication-factor {2} --partitions {3} --topic {4}'.format(base_dir, zookeeper_servers, replication_factor, partition_count, topic_name)
p = Popen(create_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
output, err = p.communicate()
logger.info("Output from create kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))

def _delete_kafka_topic(zookeeper_servers, topic_name):
"""
Delete kafka topic defined by the method parameters.

:param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
:param topic_name: name of kafka topic to delete.
"""

base_dir = os.getcwd()
delete_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --delete --zookeeper {1} --topic {2}'.format(base_dir, zookeeper_servers, topic_name)
logger.info("Deleting topic: {0}.".format(topic_name))
p = Popen(delete_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
output, err = p.communicate()
logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))

def teardown_suite():
"""
Teardown method that will be run once by zopkio test_runner after all the integration tests.
"""
for component in ['kafka', 'zookeeper']:
deployer = deployers[component]
for instance, host in c(component + '_hosts').iteritems():
deployer.undeploy(instance)
29 changes: 29 additions & 0 deletions samza-test/src/main/python/standalone_integration_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# 'License'); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os

dir = os.path.dirname(os.path.abspath(__file__))

test = {
'deployment_code': os.path.join(dir, 'standalone_deployment.py'),
'perf_code': os.path.join(dir, 'perf.py'),
'configs_directory': os.path.join(dir, 'configs'),
'test_code': [
os.path.join(dir, 'tests', 'standalone_failure_tests.py'),
],
}
Loading