Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
|`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`metadatacache/init/time`|Time taken to initialize the coordinator segment metadata cache.||Depends on the number of segments.|
|`segment/schemaCache/refresh/count`|Number of segments for which schema was refreshed in coordinator segment schema cache.|`dataSource`||
|`segment/schemaCache/dataSource/removed`|Emitted when a datasource is removed from the Broker cache due to segments being marked as unused.|`dataSource`||
|`segment/schemaCache/refresh/time`|Time taken to refresh segments in coordinator segment schema cache.|`dataSource`||
|`segment/schemaCache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`||
|`segment/schemaCache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testInsertImplicitCast()
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(queryInline);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT * FROM %s",
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testInsertWithClusteringFromCatalog()
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(queryInline);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT * FROM %s",
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testInsertWithClusteringFromQuery()
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(queryInline);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT * FROM %s",
Expand Down Expand Up @@ -327,7 +327,7 @@ public void testInsertWithMultiClusteringFromCatalog()
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(queryInline);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT * FROM %s",
Expand Down Expand Up @@ -403,7 +403,7 @@ public void testInsertWithMultiClusteringFromQuery()
// Submit the task and wait for the datasource to get loaded
SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(queryInline);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT * FROM %s",
Expand Down Expand Up @@ -462,7 +462,7 @@ public void testInsertNonDefinedColumnIntoSealedCatalogTableWithValidationDisabl
SqlTaskStatus sqlTaskStatus = cluster.callApi().onAnyBroker(b -> b.submitSqlTask(sqlQuery));

cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery("SELECT * FROM %s", dataSource, "2022-12-26T12:34:56.000Z,foo");
}
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testInsertWithIncompatibleTypeAssignmentWithValidationDisabled()
SqlTaskStatus sqlTaskStatus = cluster.callApi().onAnyBroker(b -> b.submitSqlTask(sqlQuery));

cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery("SELECT * FROM %s", dataSource, "2022-12-26T12:34:56.000Z,");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
Expand All @@ -93,6 +92,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -109,6 +109,7 @@
/**
* Embedded mode of integration-tests originally present in {@code ITAutoCompactionTest}.
*/
@Disabled("Disabled due to issues with compaction task not publishing schema to broker")
public class AutoCompactionTest extends CompactionTestBase
{
private static final Logger LOG = new Logger(AutoCompactionTest.class);
Expand Down Expand Up @@ -190,9 +191,6 @@ public class AutoCompactionTest extends CompactionTestBase
private static final Period NO_SKIP_OFFSET = Period.seconds(0);
private static final FixedIntervalOrderPolicy COMPACT_NOTHING_POLICY = new FixedIntervalOrderPolicy(List.of());

private final EmbeddedBroker broker = new EmbeddedBroker()
.addProperty("druid.sql.planner.metadataRefreshPeriod", "PT0.1s");

public static List<CompactionEngine> getEngine()
{
return List.of(CompactionEngine.NATIVE);
Expand Down Expand Up @@ -1855,7 +1853,7 @@ private void waitForCompactionToFinish(int numExpectedSegmentsAfterCompaction)
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}

cluster.callApi().waitForAllSegmentsToBeAvailable(fullDatasourceName, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(fullDatasourceName, coordinator, broker);
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
Expand Down Expand Up @@ -109,8 +108,6 @@ public class CompactionTaskTest extends CompactionTestBase
);

private String fullDatasourceName;
private final EmbeddedBroker broker = new EmbeddedBroker()
.addProperty("druid.sql.planner.metadataRefreshPeriod", "PT0.1s");

@BeforeEach
public void setFullDatasourceName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class CompactionTestBase extends EmbeddedClusterTestBase
{
protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
protected final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
protected final EmbeddedBroker broker = new EmbeddedBroker();

@Override
protected EmbeddedDruidCluster createCluster()
Expand All @@ -50,7 +51,7 @@ protected EmbeddedDruidCluster createCluster()
.addServer(overlord)
.addServer(coordinator)
.addServer(new EmbeddedIndexer())
.addServer(new EmbeddedBroker())
.addServer(broker)
.addServer(new EmbeddedHistorical())
.addServer(new EmbeddedRouter());
}
Expand All @@ -71,7 +72,7 @@ protected String runTask(TaskBuilder<?, ?, ?, ?> taskBuilder)
{
final String taskId = IdUtils.getRandomId();
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

return taskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;

/**
* Runs some basic ingestion tests using Coordinator and Overlord at version
Expand Down Expand Up @@ -86,13 +85,6 @@ public void verifyOverlordLeader()
);
}

@Override
@Disabled("Disabled due to flakiness after segment drops")
public void test_runIndexTask_andKillData()
{
super.test_runIndexTask_andKillData();
}

@Override
protected int markSegmentsAsUnused(String dataSource)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ConcurrentAppendReplaceTest extends EmbeddedClusterTestBase
{
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedBroker broker = new EmbeddedBroker();

@Override
protected EmbeddedDruidCluster createCluster()
Expand All @@ -49,7 +50,7 @@ protected EmbeddedDruidCluster createCluster()
.addServer(overlord)
.addServer(coordinator)
.addServer(new EmbeddedIndexer())
.addServer(new EmbeddedBroker())
.addServer(broker)
.addServer(new EmbeddedHistorical());
}

Expand Down Expand Up @@ -94,7 +95,7 @@ public void test_concurrentAppend_toIntervalWithUnusedSegment_usesNewSegmentId()
Assertions.assertEquals("1970-01-01T00:00:00.000ZS", segmentId2.getVersion());
Assertions.assertEquals(0, segmentId2.getPartitionNum());

cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(
data1Row,
cluster.runSql("SELECT * FROM %s", dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
);

runTask(indexTask, dataSource);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
runQueries(dataSource);

// Re-index into a different datasource, indexing 1 segment per sub-task
Expand All @@ -181,7 +181,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
);

runTask(reindexTaskSplitBySegment, dataSource2);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource2, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource2, coordinator, broker);
runQueries(dataSource2);

// Re-index into a different datasource, indexing 1 file per sub-task
Expand All @@ -207,7 +207,7 @@ public void test_runIndexTask_andReindexIntoAnotherDatasource(PartitionsSpec par
);

runTask(reindexTaskSplitByFile, dataSource3);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource3, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource3, coordinator, broker);
runQueries(dataSource3);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
Expand Down Expand Up @@ -99,10 +98,7 @@ public void test_runIndexTask_forInlineDatasource()
start = start.plusDays(1);
}

cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
broker.latchableEmitter().waitForEvent(
event -> event.hasDimension(DruidMetrics.DATASOURCE, dataSource)
);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
Assertions.assertEquals(
Resources.InlineData.CSV_10_DAYS,
cluster.runSql("SELECT * FROM %s", dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public class IngestionSmokeTest extends EmbeddedClusterTestBase
/**
* Broker with a short metadata refresh period.
*/
protected EmbeddedBroker broker = new EmbeddedBroker()
.addProperty("druid.sql.planner.metadataRefreshPeriod", "PT1s");
protected EmbeddedBroker broker = new EmbeddedBroker();

/**
* Event collector used to wait for metric events to occur.
Expand Down Expand Up @@ -177,6 +176,14 @@ public void test_runIndexTask_andKillData()
.hasService("druid/coordinator"),
agg -> agg.hasSumAtLeast(numSegments)
);

// Wait for the Broker to remove this datasource from its schema cache
eventCollector.latchableEmitter().waitForEvent(
Comment thread
kfaraz marked this conversation as resolved.
event -> event.hasMetricName("segment/schemaCache/dataSource/removed")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasService("druid/broker")
);

cluster.callApi().verifySqlQuery("SELECT * FROM sys.segments WHERE datasource='%s'", dataSource, "");

// Kill all unused segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public EmbeddedDruidCluster createCluster()
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always");

broker.addProperty("druid.msq.dart.controller.heapFraction", "0.9")
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
.addProperty("druid.sql.planner.metadataRefreshPeriod", "PT0.1s");
.addProperty("druid.query.default.context.maxConcurrentStages", "1");

historical.addProperty("druid.msq.dart.worker.heapFraction", "0.9")
.addProperty("druid.msq.dart.worker.concurrentQueries", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

public class MSQKeyStatisticsSketchMergeModeTest extends EmbeddedClusterTestBase
{
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
Expand All @@ -56,7 +57,7 @@ protected EmbeddedDruidCluster createCluster()
.addServer(overlord)
.addServer(coordinator)
.addServer(indexer)
.addServer(new EmbeddedBroker())
.addServer(broker)
.addServer(new EmbeddedHistorical())
.addServer(new EmbeddedRouter());
}
Expand All @@ -83,7 +84,7 @@ public void testMsqIngestionParallelMerging()

final SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
Expand All @@ -110,7 +111,7 @@ public void testMsqIngestionSequentialMerging()

SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testMsqIngestionSequentialMergingWithEmptyStatistics()

SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
*/
public class MSQWorkerFaultToleranceTest extends EmbeddedClusterTestBase
{
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
Expand All @@ -65,7 +66,7 @@ protected EmbeddedDruidCluster createCluster()
.addServer(overlord)
.addServer(coordinator)
.addServer(indexer)
.addServer(new EmbeddedBroker())
.addServer(broker)
.addServer(new EmbeddedHistorical());
}

Expand Down Expand Up @@ -122,7 +123,7 @@ public void test_cancelledWorker_isRetried_ifFaultToleranceIsEnabled() throws Ex

// Verify that the controller task eventually succeeds
cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter());
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

public class MultiStageQueryTest extends EmbeddedClusterTestBase
{
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
Expand All @@ -62,7 +63,7 @@ protected EmbeddedDruidCluster createCluster()
.addServer(overlord)
.addServer(coordinator)
.addServer(indexer)
.addServer(new EmbeddedBroker())
.addServer(broker)
.addServer(new EmbeddedHistorical());
}

Expand All @@ -83,7 +84,7 @@ public void testMsqIngestionAndQuerying()

final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter());
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);

cluster.callApi().verifySqlQuery(
"SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
Expand Down
Loading
Loading