Skip to content

Add Integration Test for functionality of kinesis ingestion#9576

Merged
jon-wei merged 24 commits intoapache:masterfrom
maytasm:IMPLY-2223
Apr 3, 2020
Merged

Add Integration Test for functionality of kinesis ingestion#9576
jon-wei merged 24 commits intoapache:masterfrom
maytasm:IMPLY-2223

Conversation

@maytasm
Copy link
Copy Markdown
Contributor

@maytasm maytasm commented Mar 28, 2020

Add Integration Test for functionality of kinesis ingestion

Description

The new set of integration test for Kinesis follows the same concept as Bring Your Own Cloud (BYOC) that S3, GCS, Azure Integration tests uses (#9501). Basically, anyone running will have to provide their own Kinesis credentials in a conf file and pass the file to mvn using -Doverride.config.path

Added following Integration Test for functionality of kinesis ingestion:

  1. Functional tests when Druid and Kafka are in stable state
  • legacy parser
  • inputFormat
  • Greater than 1 taskCount
  1. Functional tests when Druid is in an unstable state
  • losing nodes
  • Stop/start supervisor
  1. Functional tests when Kafka is in an unstable state
  • adding partitions
  • removing partitions

To verify ingestion:

  • Kafka lag should be minimal, the consumer should be able to pull off the queue at a comparable rate to the producer.
  • Realtime queries works from the indexing tasks
  • Queries works reading from historical segments (after handed off)
  • Queries return expected count/value/etc.

Added integration test infrastructure/helper:

  • Data generator - Generate data to kinesis stream (Kafka can also be refactor to use this later)
  • DruidAdminClient - to control the Integration test's Druid Docker cluster
  • KinesisAdminClient - Create stream, delete stream, etc.
  • Since tests can fail or be killed un-expectedly, added druid-ci-expire-after tag to all stream created by integration test. The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. (help make clean up easier)
  • Also made debugging integration test easier by automatically enabling debug mode and exposing debug port on integration test's druids docker processes

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

@maytasm maytasm changed the title Add Integration Test for functionality of kinesis ingestion [WIP] Add Integration Test for functionality of kinesis ingestion Mar 28, 2020
@maytasm maytasm changed the title [WIP] Add Integration Test for functionality of kinesis ingestion Add Integration Test for functionality of kinesis ingestion Mar 30, 2020
Copy link
Copy Markdown
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super cool! I hope to use the DruidAdminClient to automate some of these restart tests

Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>.
The file must contain one property per line, the key must start with `druid_` and the format should be snake case.

## Debugging Druid while running tests
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


FILE_CHECK_IF_RAN=/tls/server.key
if [ -f "$FILE_CHECK_IF_RAN" ]; then
echo "Script was ran already. Skip running again."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: No need to change if everything else looks good. If I saw the log line as is, it's a little ambiguous - which script? what's the impact of skipping running again?

Suggested change
echo "Script was ran already. Skip running again."
echo "Using existing tls keys since /tls/server.key exists - skipping generation of all certs. To generate certs, delete this file"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread integration-tests/README.md Outdated
# Run all integration tests that have been verified to work against a quickstart cluster.
mvn verify -P int-tests-config-file -Dgroups=quickstart-compatible
```
>>>>>>> upstream/master
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like something from a conflict

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Good catch. Removed

event.put("robot", "false");
event.put("anonymous", "false");
event.put("namespace", "article");
event.put("continent", "North Americ");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

North Americ -> North America

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

{
private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
private static final int KINESIS_SHARD_COUNT = 2;
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the expire tag used?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind, just saw that part of the description

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here explaining what it's used for?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to help people cleanup the test streams if the IT cleanup method fails or didn't run (this shouldn't happen normally but can such as if the test unexpectedly terminates midway). Added the comment

int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
// Wait for kinesis stream to finish resharding
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the resharding test, I think you'll want to have longer timers for the event generation, with only 3s here I think it's maybe possible that AWS doesn't actually begin the resharding until you've already finished this second phase. Maybe 30s is better.

Or maybe it could check for the stream status becoming UPDATING and start the second phase then.

Copy link
Copy Markdown
Contributor Author

@maytasm maytasm Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the logic to:

  • after issuing reshard call
  • do DescribeStream polling for an updating status or an active status with the final expected number of shards
  • begin second phase when ^ true
  • check that stream is active status (no need to check the number of shards since earlier we already check for "updating status or an active status with the final expected number of shards", hence if it is active now it was be the active after resharding)
  • begin third phase when ^ true

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From running locally, I can see that resharding does takes around 30000-40000ms (3-4 mins). This means that after issuing reshard call, when we check for "updating status or an active status with the final expected number of shards" immediately after, then very most likely it will be "updating status" that returns true (rather than "active status with the final expected number of shards"). I am only including "active status with the final expected number of shards" check in case the reshard finish by the time we do the check (most likely wont happen)

);
// Start generating remainding data (after resharding)
wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
// Verify supervisor is healthy after suspension
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest having a supervisor healthy check as well before the resharding occurs, so the resharding occurs while the supervisor is running

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.queryHelper.testQueriesFromString(querySpec, 2);
LOG.info("Shutting down supervisor");
indexer.shutdownSupervisor(supervisorId);
// wait for all kafka indexing tasks to finish
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka -> kinesis

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void testKineseIndexDataWithLegacyParserStableState() throws Exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test names, Kinese -> Kinesis here and elsewhere

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@jon-wei jon-wei merged commit 1852bf3 into apache:master Apr 3, 2020
jihoonson pushed a commit that referenced this pull request Apr 24, 2020
* backport Add Integration Test for functionality of kinesis ingestion (#9576)

* backport Add integration tests for kafka ingestion (#9724)

* resolve merge conflict

* integration test cluster prop change to support parallel
JulianJaffePinterest pushed a commit to JulianJaffePinterest/druid that referenced this pull request Jun 12, 2020
)

* kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* Kinesis IT

* fix kinesis timeout

* Kinesis IT

* Kinesis IT

* fix checkstyle

* Kinesis IT

* address comments

* fix checkstyle
@clintropolis clintropolis added this to the 0.19.0 milestone Jun 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants