Skip to content

Conversation

@shanthoosh
Copy link
Contributor

@shanthoosh shanthoosh commented Jun 14, 2018

In the standalone model, a processor can leave and join the group at any point in time. This processor reshuffle is referred to as rebalancing which results in task(work) redistribution amongst other available, live processors in the group.

Processor rebalancing in existing standalone integration tests(junit tests) is accomplished through clean shutdown of the processors. However, in real production scenarios, processor rebalancing is triggered through unclean shutdown and full garbage collection(GC) of the processors.

As a part of this patch to cover those scenarios, the following integration tests are added.

  1. Force killing the leader processor of the group.
  2. Force killing a single follower in the group.
  3. Force killing multiple followers in the group.
  4. Force killing the leader and a follower in the group.
  5. Suspending and resuming the leader of the group.

Since existing standalone integration tests cover event consumption/production after the re-balancing phase, these new tests will just test the coordination. We'll iterate on this initial suite and add tests whenever necessary.

@shanthoosh
Copy link
Contributor Author

@vjagadish1989 @xinyuiscool
Please take a look when you get a chance.

@shanthoosh shanthoosh force-pushed the standalone_failure_tests branch from 1bdb1d5 to 8db964f Compare June 15, 2018 02:40
@shanthoosh
Copy link
Contributor Author

shanthoosh commented Jun 15, 2018

Verification:

i=0
while [ $i -lt 150 ]; do
                i=`expr $i + 1`
                echo "Run " +$i 
                ./bin/integration-tests.sh /tmp/samza-tests/ standalone-integration-tests --nopassword >> ~/test-logs-runTests-1_26
done;

Result:

grep -i 'passed' ~/test-logs-runTests-1_26 | wc -l 
750

grep -i 'failed' ~/test-logs-runTests-1_26 | wc -l 
0

Though the tests were run for 150 times, the result had 750 passed text since each test prints passed 5 times like the following:

2018-06-15 10:28:30,870 zopkio.test_runner [INFO] test_kill_leader----passed
2018-06-15 10:28:30,870 zopkio.test_runner [INFO] test_kill_multiple_followers----passed
2018-06-15 10:28:30,870 zopkio.test_runner [INFO] test_kill_leader_and_a_follower----passed
2018-06-15 10:28:30,870 zopkio.test_runner [INFO] test_kill_one_follower----passed
2018-06-15 10:28:30,871 zopkio.test_runner [INFO] test_pause_resume_leader----passed

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions.

*
* This runner class is built for standalone failure tests and not recommended for general use.
*/
public class LocalApplicationRunnerMain {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be specific about the application name and add test to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* Acts as a pass through filter for all the events from a input stream.
*/
public class PassThroughStreamApplication implements StreamApplication {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a real app name, like StandaloneIntegrationTestKafkaApp or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


logger = logging.getLogger(__name__)

class StreamProcessor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamProcessorLauncher

job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model)
assert leader_processor_id not in job_model['containers'], 'Leader processor: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check also we have new leader id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the leader is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved, thanks!

import urllib
import os

TEST_INPUT_TOPIC = 'standaloneIntegrationTestKafkaInputTopic'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use underscores instead of camel-case for Kafka topics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

output, err = p.communicate()
logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))

def setup_suite():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move setup_suite to the top

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

## Create input and output topics.
for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]:
logger.info("Creating topic: {0}.".format(topic))
_create_kafka_topic('localhost:2181', topic, 3, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should start with a "clean slate" each time we start the suite and not rely on teardown being invoked reliably. for eg: you could clear the Kafka directory during start or use a unique topic name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Clearing up the kafka directory during startup.

pids = []
if len(full_output) > 0:
pids = [int(pid_str) for pid_str in full_output.split('\n') if pid_str.isdigit()]
return pids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this return a single "pid" or a list of "pid"s? if there is one pid per "StreamProcessor", would be simpler to return a value here

event.wait(GROUP_COORDINATION_TIMEOUT * 2)

job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you get better encapsulation by wrapping this up into zk_client.get_processors() instead of duplicating this here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

__pump_messages_into_input_topic()
processors = __setup_processors()

leader_processor_id = zk_client.get_leader_processor_id()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to validate the following:
1.Before: there are 3 processors in the old jobmodel; After killing one follower: there are 2 processors in the new jobmodel
2. There should be no un-assigned partition. We should ensure this invariant always holds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Added both assertions in all of the integration tests.

break

event = threading.Event()
zk_client.watch_job_model(job_model_watch(event = event, expected_processors=processors.keys()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we validate the expected set of processors, it'd also be nice to validate their assignments

## Verifications after leader was suspended.
job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this asserts that all processors except the leader are on the group. should we also assert that the leader is not in the group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved. modulo previous comments.

Copy link
Contributor Author

@shanthoosh shanthoosh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

Done with all the comments.

/**
* Acts as a pass through filter for all the events from a input stream.
*/
public class PassThroughStreamApplication implements StreamApplication {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import urllib
import os

TEST_INPUT_TOPIC = 'standaloneIntegrationTestKafkaInputTopic'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

output, err = p.communicate()
logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))

def setup_suite():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

## Create input and output topics.
for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]:
logger.info("Creating topic: {0}.".format(topic))
_create_kafka_topic('localhost:2181', topic, 3, 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Clearing up the kafka directory during startup.

event.wait(GROUP_COORDINATION_TIMEOUT * 2)

job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*
* This runner class is built for standalone failure tests and not recommended for general use.
*/
public class LocalApplicationRunnerMain {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in JobModel: {1}.'.format(processor_id, job_model)
assert leader_processor_id not in job_model['containers'], 'Leader processor: {0} exists in JobModel: {1}.'.format(leader_processor_id, job_model)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

__pump_messages_into_input_topic()
processors = __setup_processors()

leader_processor_id = zk_client.get_leader_processor_id()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Added both assertions in all of the integration tests.

## Verifications after leader was suspended.
job_model = zk_client.get_latest_job_model()
for processor_id, deployer in processors.iteritems():
assert processor_id in job_model['containers'], 'Processor id: {0} does not exist in containerModel: {1}.'.format(processor_id, job_model['containers'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants