Skip to content
17 changes: 15 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,19 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8'
script: *run_integration_test
after_failure: *integration_test_diags

- &integration_tests
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
Expand Down Expand Up @@ -433,10 +441,15 @@ jobs:
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11'

- <<: *integration_append_ingestion
name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'

- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11'
env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
# END - Integration tests for Compile with Java 8 and Run with Java 11

- name: "security vulnerabilities"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
Expand Down Expand Up @@ -95,9 +94,9 @@ private String getFullSegmentsURL(String dataSource)
return StringUtils.format("%sdatasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}

private String getLoadStatusURL()
private String getLoadStatusURL(String dataSource)
{
return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus");
return StringUtils.format("%sdatasources/%s/loadstatus?forceMetadataRefresh=true&interval=1970-01-01/2999-01-01", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
}

/** return a list of the segment dates for the specified data source */
Expand Down Expand Up @@ -173,11 +172,27 @@ public List<DataSegment> getAvailableSegments(final String dataSource)
}
}

private Map<String, Integer> getLoadStatus()
private Map<String, Integer> getLoadStatus(String dataSorce)
{
String url = getLoadStatusURL(dataSorce);
Map<String, Integer> status;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)),
responseHandler
).get();

if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) {
return null;
}
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}

status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
Expand All @@ -191,18 +206,10 @@ private Map<String, Integer> getLoadStatus()
return status;
}

/**
* Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which
* caches segments in memory and periodically updates them. Hence, there can be a race condition as
* this API implementation compares segments metadata from cache with segments in historicals.
* Particularly, when number of segment changes after the first initial load of the datasource.
* Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments}
* before calling this method (since, that would wait until the cache is updated with expected data)
*/
public boolean areSegmentsLoaded(String dataSource)
{
final Map<String, Integer> status = getLoadStatus();
return (status.containsKey(dataSource) && status.get(dataSource) == 100.0);
final Map<String, Integer> status = getLoadStatus(dataSource);
return (status != null && status.containsKey(dataSource) && status.get(dataSource) == 100.0);
}

public void unloadSegmentsForDataSource(String dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.testing.utils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.model.Container;
import com.github.dockerjava.core.DockerClientBuilder;
Expand All @@ -31,6 +32,7 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.jboss.netty.handler.codec.http.HttpMethod;
Expand All @@ -51,15 +53,18 @@ public class DruidClusterAdminClient
private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router";
private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager";

private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private IntegrationTestingConfig config;

@Inject
DruidClusterAdminClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.config = config;
}
Expand Down Expand Up @@ -97,6 +102,7 @@ public void restartMiddleManagerContainer()
public void waitUntilCoordinatorReady()
{
waitUntilInstanceReady(config.getCoordinatorUrl());
postDynamicConfig(CoordinatorDynamicConfig.builder().withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1).build());
}

public void waitUntilHistoricalReady()
Expand Down Expand Up @@ -159,4 +165,29 @@ private void waitUntilInstanceReady(final String host)
"Waiting for instance to be ready: [" + host + "]"
);
}

private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig)
{
ITRetryUtil.retryUntilTrue(
() -> {
try {
String url = StringUtils.format("%s/druid/coordinator/v1/config", config.getCoordinatorUrl());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(coordinatorDynamicConfig)
), StatusResponseHandler.getInstance()
).get();

LOG.info("%s %s", response.getStatus(), response.getContent());
return response.getStatus().equals(HttpResponseStatus.OK);
}
catch (Throwable e) {
LOG.error(e, "");
return false;
}
},
"Posting dynamic config after startup"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class TestNGGroup

public static final String OTHER_INDEX = "other-index";

public static final String APPEND_INGESTION = "append-ingestion";

public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
verifyQuery(INDEX_QUERIES_RESOURCE);

submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total)
forceTriggerAutoCompaction(5);
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);

submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total)
forceTriggerAutoCompaction(6);
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Expand All @@ -119,8 +119,8 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST);

// Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (14 total)
forceTriggerAutoCompaction(14);
//...compacted into 10 new segments across 2 days. 5 new segments each day (10 total)
forceTriggerAutoCompaction(10);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1);

Expand Down Expand Up @@ -154,6 +154,8 @@ public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception
@Test
public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
{
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 0);
loadData(INDEX_TASK);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
Expand All @@ -162,19 +164,7 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);

// Skips first day. Should only compact one out of two days.
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);

// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 100);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);

// Set maxCompactionTaskSlots to 0 to prevent any compaction
updateCompactionTaskSlot(0.1, 0);
// ...should remains unchanged (4 total)
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
Expand All @@ -183,15 +173,15 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception

// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
// One day compacted (1 new segment) and one day remains uncompacted. (5 total)
forceTriggerAutoCompaction(5);
// One day compacted (1 new segment) and one day remains uncompacted. (3 total)
forceTriggerAutoCompaction(3);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (6 total)
forceTriggerAutoCompaction(6);
// Remaining day compacted (1 new segment). Now both days compacted (2 total)
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Expand Down Expand Up @@ -283,11 +273,11 @@ private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction)
{
compactionResource.forceTriggerAutoCompaction();
waitForAllTasksToCompleteForDataSource(fullDatasourceName);
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"
);
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
}

private void verifySegmentsCount(int numExpectedSegments)
Expand Down
Loading