Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,30 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_kafka_index_slow
name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_kafka_transactional_index
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_kafka_transactional_index_slow
name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_query
name: "(Compile=openjdk8, Run=openjdk8) query integration test"
jdk: openjdk8
Expand Down Expand Up @@ -341,7 +365,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=8'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
Expand All @@ -357,11 +381,6 @@ jobs:
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11'

- <<: *integration_kafka_index
name: "(Compile=openjdk8, Run=openjdk11) kafka index integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=11'

- <<: *integration_query
name: "(Compile=openjdk8, Run=openjdk11) query integration test"
jdk: openjdk8
Expand All @@ -380,7 +399,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11

- name: "security vulnerabilities"
Expand Down
23 changes: 22 additions & 1 deletion integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,18 @@ can either be 8 or 11.
Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>.
The file must contain one property per line, the key must start with `druid_` and the format should be snake case.

## Debugging Druid while running tests
## Tips & tricks for debugging and developing integration tests

### Useful mvn command flags

- -Dskip.start.docker=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing
up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
these containers already running if using this flag. Additionally, please make sure that the running containers
are in the same state that the setup script (run_cluster.sh) would have brought it up in.
- -Dskip.stop.docker=true to skip stopping and teardowning down the docker containers. This can be useful in further
debugging after the integration tests have finish running.

### Debugging Druid while running tests

For your convenience, Druid processes running inside Docker have debugging enabled and the following ports have
been made available to attach your remote debugger (such as via IntelliJ IDEA's Remote Configuration):
Expand Down Expand Up @@ -303,3 +314,13 @@ This will tell the test framework that the test class needs to be constructed us
2) FromFileTestQueryHelper - reads queries with expected results from file and executes them and verifies the results using ResultVerifier

Refer ITIndexerTest as an example on how to use dependency Injection

### Running test methods in parallel
By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test
class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does AllParallelizedTests parallelize tests in a single class? Or can it parallelize across classes? Probably worth mentioning here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only "methods inside the test cases are executed in parallel". Updated.

test tag section in integration-tests/src/test/resources/testng.xml
Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
test does not use excessive memory starving other concurent task, test does not modify and/or use other task,
supervisor, datasource it did not create.
38 changes: 6 additions & 32 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
Expand Down Expand Up @@ -304,21 +299,6 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${apache.kafka.version}</version>
<exclusions>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -367,6 +347,8 @@
<id>integration-tests</id>
<properties>
<start.hadoop.docker>false</start.hadoop.docker>
<skip.start.docker>false</skip.start.docker>
<skip.stop.docker>false</skip.stop.docker>
<override.config.path></override.config.path>
<resource.file.dir.path></resource.file.dir.path>
</properties>
Expand All @@ -385,6 +367,7 @@
<configuration>
<environmentVariables>
<DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>${start.hadoop.docker}</DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>
<DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>${skip.start.docker}</DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

<DRUID_INTEGRATION_TEST_JVM_RUNTIME>${jvm.runtime}</DRUID_INTEGRATION_TEST_JVM_RUNTIME>
<DRUID_INTEGRATION_TEST_GROUP>${groups}</DRUID_INTEGRATION_TEST_GROUP>
<DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>${override.config.path}</DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>
Expand All @@ -400,6 +383,9 @@
</goals>
<phase>post-integration-test</phase>
<configuration>
<environmentVariables>
<DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>${skip.stop.docker}</DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>
</environmentVariables>
<executable>${project.basedir}/stop_cluster.sh</executable>
</configuration>
</execution>
Expand All @@ -419,12 +405,6 @@
</execution>
</executions>
<configuration>
<properties>
<property>
<name>testrunfactory</name>
<value>org.testng.DruidTestRunnerFactory</value>
</property>
</properties>
<argLine>
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
Expand Down Expand Up @@ -477,12 +457,6 @@
</execution>
</executions>
<configuration>
<properties>
<property>
<name>testrunfactory</name>
<value>org.testng.DruidTestRunnerFactory</value>
</property>
</properties>
<argLine>
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
Expand Down
6 changes: 6 additions & 0 deletions integration-tests/run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Skip starting docker if flag set (For use during development)
if [ -n "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" == true ]
then
exit 0
fi

# Cleanup old images/containers
{
for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -156,6 +157,15 @@ public List<TaskResponseObject> getCompleteTasksForDataSource(final String dataS
return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
}

public List<TaskResponseObject> getUncompletedTasksForDataSource(final String dataSource)
{
List<TaskResponseObject> uncompletedTasks = new ArrayList<>();
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=pending&datasource=%s", StringUtils.urlEncode(dataSource))));
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=running&datasource=%s", StringUtils.urlEncode(dataSource))));
uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=waiting&datasource=%s", StringUtils.urlEncode(dataSource))));
return uncompletedTasks;
}

private List<TaskResponseObject> getTasks(String identifier)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ private static List<? extends Module> getModules()
@Override
public Module createModule(ITestContext context, Class<?> testClass)
{
context.addGuiceModule(DruidTestModule.class, MODULE);
context.addInjector(Collections.singletonList(MODULE), INJECTOR);
return MODULE;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.utils;

import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaAdminClient implements StreamAdminClient
{
private AdminClient adminClient;

public KafkaAdminClient(String kafkaInternalHost)
{
Properties config = new Properties();
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost);
adminClient = AdminClient.create(config);
}

@Override
public void createStream(String streamName, int partitionCount, Map<String, String> tags) throws Exception
{
final short replicationFactor = 1;
final NewTopic newTopic = new NewTopic(streamName, partitionCount, replicationFactor);
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
// Wait for create topic to compelte
createTopicsResult.values().get(streamName).get();
}

@Override
public void deleteStream(String streamName) throws Exception
{
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableList.of(streamName));
deleteTopicsResult.values().get(streamName).get();
}

/**
* This method can only increase the partition count of {@param streamName} to have a final partition
* count of {@param newPartitionCount}
* If {@param blocksUntilStarted} is set to true, then this method will blocks until the partitioning
* started (but not nessesary finished), otherwise, the method will returns right after issue the
* repartitioning command
*/
@Override
public void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception
{
Map<String, NewPartitions> counts = new HashMap<>();
counts.put(streamName, NewPartitions.increaseTo(newPartitionCount));
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(counts);
if (blocksUntilStarted) {
createPartitionsResult.values().get(streamName).get();

}
}

/**
* Stream state such as active/non-active does not applies to Kafka.
* Returning true since Kafka stream is always active and can always be writen and read to.
*/
@Override
public boolean isStreamActive(String streamName)
{
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "active stream" mean for Kafka?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't means anything but since it implement the interface it needs the method. Added comment.

}

@Override
public int getStreamPartitionCount(String streamName) throws Exception
{
DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(streamName));
TopicDescription topicDescription = result.values().get(streamName).get();
return topicDescription.partitions().size();
}

@Override
public boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception
{
return getStreamPartitionCount(streamName) == newPartitionCount;
}
}
Loading