Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions integration-tests-ex/cases/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatusResponse;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCompactionConfig;
Expand Down Expand Up @@ -291,7 +293,7 @@ public Map<String, String> getCompactionProgress(String dataSource) throws Excep
return jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
}

public Map<String, String> getCompactionStatus(String dataSource) throws Exception
public AutoCompactionSnapshot getCompactionStatus(String dataSource) throws Exception
{
String url = StringUtils.format("%scompaction/status?dataSource=%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource));
StatusResponseHolder response = httpClient.go(
Expand All @@ -306,8 +308,8 @@ public Map<String, String> getCompactionStatus(String dataSource) throws Excepti
response.getContent()
);
}
Map<String, List<Map<String, String>>> latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
return latestSnapshots.get("latestStatus").get(0);
final CompactionStatusResponse latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<>() {});
return latestSnapshots.getLatestStatus().get(0);
}

public CompactionSimulateResult simulateRunOnCoordinator() throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.testing.guice;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
Expand All @@ -32,8 +30,7 @@
import org.apache.druid.guice.annotations.EscalatedClient;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.core.LoggingEmitter;
import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.CredentialedHttpClient;
import org.apache.druid.java.util.http.client.HttpClient;
Expand Down Expand Up @@ -83,8 +80,10 @@ public HttpClient getHttpClient(

@Provides
@ManageLifecycle
public ServiceEmitter getServiceEmitter(Supplier<LoggingEmitterConfig> config, ObjectMapper jsonMapper)
public ServiceEmitter getServiceEmitter()
{
return new ServiceEmitter("", "", new LoggingEmitter(config.get(), jsonMapper));
// Disabling metric emission since no useful metrics are emitted by the integration testing client
// Use a LoggingEmitter here if needed in the future
return new ServiceEmitter("", "", new NoopEmitter());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public static <T> void retryUntilEquals(
}
}

System.out.printf("Retries[%d] exhausted.%n", retryCount);

if (lastException != null) {
throw new ISE(
lastException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,32 +324,12 @@ private void ensureCompactionTaskCount(int expectedCount)
*/
private int getNumberOfCompletedCompactionTasks()
{
List<TaskResponseObject> incompleteTasks = indexer
.getUncompletedTasksForDataSource(fullDatasourceName);
List<TaskResponseObject> completeTasks = indexer
.getCompleteTasksForDataSource(fullDatasourceName);

printTasks(incompleteTasks, "Incomplete");
printTasks(completeTasks, "Complete");

return (int) completeTasks.stream().filter(this::isCompactionTask).count();
}

private void printTasks(List<TaskResponseObject> tasks, String taskState)
Comment thread
Akshat-Jain marked this conversation as resolved.
{
StringBuilder sb = new StringBuilder();
tasks.forEach(
task -> sb.append("{")
.append(task.getType())
.append(", ")
.append(task.getStatus())
.append(", ")
.append(task.getCreatedTime())
.append("}, ")
);
LOG.info("%s Tasks: %s", taskState, sb);
}

/**
* Retries until the total row count is as expected.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2132,17 +2132,17 @@ private void getAndAssertCompactionStatus(
long intervalCountSkipped
) throws Exception
{
Map<String, String> actualStatus = compactionResource.getCompactionStatus(fullDatasourceName);
AutoCompactionSnapshot actualStatus = compactionResource.getCompactionStatus(fullDatasourceName);
Assert.assertNotNull(actualStatus);
Assert.assertEquals(actualStatus.get("scheduleStatus"), scheduleStatus.toString());
MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesAwaitingCompaction")), bytesAwaitingCompactionMatcher);
MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesCompacted")), bytesCompactedMatcher);
MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesSkipped")), bytesSkippedMatcher);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountAwaitingCompaction")), segmentCountAwaitingCompaction);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountCompacted")), segmentCountCompacted);
Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountSkipped")), segmentCountSkipped);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountAwaitingCompaction")), intervalCountAwaitingCompaction);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountCompacted")), intervalCountCompacted);
Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountSkipped")), intervalCountSkipped);
Assert.assertEquals(actualStatus.getScheduleStatus(), scheduleStatus);
MatcherAssert.assertThat(actualStatus.getBytesAwaitingCompaction(), bytesAwaitingCompactionMatcher);
MatcherAssert.assertThat(actualStatus.getBytesCompacted(), bytesCompactedMatcher);
MatcherAssert.assertThat(actualStatus.getBytesSkipped(), bytesSkippedMatcher);
Assert.assertEquals(actualStatus.getSegmentCountAwaitingCompaction(), segmentCountAwaitingCompaction);
Assert.assertEquals(actualStatus.getSegmentCountCompacted(), segmentCountCompacted);
Assert.assertEquals(actualStatus.getSegmentCountSkipped(), segmentCountSkipped);
Assert.assertEquals(actualStatus.getIntervalCountAwaitingCompaction(), intervalCountAwaitingCompaction);
Assert.assertEquals(actualStatus.getIntervalCountCompacted(), intervalCountCompacted);
Assert.assertEquals(actualStatus.getIntervalCountSkipped(), intervalCountSkipped);
}
}
Loading