Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
398d257
Add SqlSegmentsMetadataCache
kfaraz Jan 22, 2025
752664b
Acquire locks, close transactions
kfaraz Jan 27, 2025
e4da41f
Keep unused max partition in cache
kfaraz Jan 28, 2025
0df8a60
Add unit tests, cache only used segment states
kfaraz Jan 28, 2025
d6dcb42
Cleanup javadocs
kfaraz Jan 28, 2025
afe1b5f
More javadocs and cleanup
kfaraz Jan 28, 2025
47424a8
Fix some tests
kfaraz Jan 28, 2025
c0f4bad
Enable more UTs to use segment metadata cache
kfaraz Jan 29, 2025
271a348
Remove forbidden APIs
kfaraz Jan 29, 2025
12dc6f2
Run all ITs with cache enabled
kfaraz Jan 30, 2025
08dc96f
Allow ingestion tests to use segment metadata cache
kfaraz Jan 30, 2025
211f4bf
More tests and cleanup
kfaraz Jan 31, 2025
3337594
Enable cache for all tests temporarily
kfaraz Jan 31, 2025
cf1e7e3
Remove usage of forbidden API
kfaraz Jan 31, 2025
2ecb54c
Fix tests, checkstyle
kfaraz Jan 31, 2025
3533451
Fix tests
kfaraz Jan 31, 2025
68dfb5c
Add more tests
kfaraz Feb 3, 2025
97cbd0b
Merge branch 'master' of github.com:apache/druid into cache_segments_…
kfaraz Feb 3, 2025
cb7d155
Add javadocs and tests
kfaraz Feb 3, 2025
4501d58
More tests
kfaraz Feb 4, 2025
bc160b3
Fix flakiness of unit test
kfaraz Feb 6, 2025
2162fed
Remove timeline from cache, use interval map instead
kfaraz Feb 7, 2025
2ba159e
Fix checkstyle
kfaraz Feb 7, 2025
4bc6a8a
Address review feedback
kfaraz Feb 8, 2025
600cce2
Prepone sync on becoming leader
kfaraz Feb 9, 2025
aa34805
Fix tests with cache
kfaraz Feb 9, 2025
79befed
Add comments
kfaraz Feb 10, 2025
cc5547f
Add more comments
kfaraz Feb 11, 2025
cb1e29e
Update docs for metrics and config
kfaraz Feb 11, 2025
92c8200
Update docs
kfaraz Feb 11, 2025
7a9bd19
Update metrics
kfaraz Feb 11, 2025
6c5655d
Merge branch 'master' of github.com:apache/druid into cache_segments_…
kfaraz Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,15 @@ If autoscaling is enabled, you can set these additional configs:

The `druid.supervisor.idleConfig.*` specification in the Overlord runtime properties defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../ingestion/kinesis-ingestion.md#io-configuration) to override it for an individual supervisor.

##### Segment metadata cache (EXPERIMENTAL)

The following properties pertain to segment metadata caching on the Overlord that may be used to speed up segment allocation and other metadata operations.

|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.segments.useCache`|If `true`, segment metadata is cached on the Overlord to speed up metadata operations.|false|
|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is `true`.|`PT1M` (1 minute)|

#### Overlord dynamic configuration

The Overlord has dynamic configurations to tune how Druid assigns tasks to workers.
Expand Down
19 changes: 19 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,25 @@ If the JVM does not support CPU time measurement for the current thread, `ingest
|`worker/task/success/count`|Number of tasks that succeeded on an indexer during the emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|
|`worker/task/running/count`|Number of tasks running on an indexer per emission period. This metric is only available if the `WorkerTaskCountStatsMonitor` module is included.|`dataSource`|Varies|

### Segment metadata cache

The following metrics are emitted only when [segment metadata caching](../configuration/index.md#segment-metadata-cache-experimental) is enabled on the Overlord.

|Metric|Description|Dimensions|
|------|-----------|----------|
|`segment/used/count`|Number of used segments currently present in the metadata store.|`dataSource`|
|`segment/unused/count`|Number of unused segments currently present in the metadata store.|`dataSource`|
|`segment/pending/count`|Number of pending segments currently present in the metadata store.|`dataSource`|
|`segment/metadataCache/sync/time`|Number of milliseconds taken for the cache to sync with the metadata store.||
|`segment/metadataCache/deleted`|Total number of segments deleted from the cache during the latest sync.||
|`segment/metadataCache/skipped`|Total number of unparseable segment records that were skipped in the latest sync.||
|`segment/metadataCache/used/stale`|Number of used segments in the cache which are out-of-date and need to be refreshed.|`dataSource`|
|`segment/metadataCache/used/updated`|Number of used segments updated in the cache during the latest sync.|`dataSource`|
|`segment/metadataCache/unused/updated`|Number of unused segments updated in the cache during the latest sync.|`dataSource`|
|`segment/metadataCache/pending/deleted`|Number of pending segments deleted from the cache during the latest sync.|`dataSource`|
|`segment/metadataCache/pending/updated`|Number of pending segments updated in the cache during the latest sync.|`dataSource`|
|`segment/metadataCache/pending/skipped`|Number of unparseable pending segment records that were skipped in the latest sync.|`dataSource`|

## Shuffle metrics (Native parallel task)

The shuffle metrics can be enabled by adding `org.apache.druid.indexing.worker.shuffle.ShuffleMonitor` in `druid.monitoring.monitors`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -105,6 +108,13 @@ public void setUp()
derbyConnector
);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentMetadataTransactionFactory(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand All @@ -56,6 +58,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -112,6 +115,13 @@ public void setUp() throws Exception
);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new SqlSegmentMetadataTransactionFactory(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.JsonConfigurator;
Expand Down Expand Up @@ -129,6 +131,14 @@ public ServiceEmitter getEmitter()
{
return new ServiceEmitter("test", "localhost", new NoopEmitter());
}

@Provides
@IndexingService
public DruidLeaderSelector getLeaderSelector()
{
// A provider for DruidLeaderSelector is needed by SqlSegmentMetadataTransactionFactory
return null;
}
}
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;

Expand Down Expand Up @@ -89,6 +90,7 @@ public DruidOverlord(
final OverlordDutyExecutor overlordDutyExecutor,
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final SegmentMetadataCache segmentMetadataCache,
final CompactionScheduler compactionScheduler,
final ScheduledBatchTaskManager scheduledBatchTaskManager,
final ObjectMapper mapper,
Expand Down Expand Up @@ -141,6 +143,7 @@ public void becomeLeader()
@Override
public void start()
{
segmentMetadataCache.becomeLeader();
segmentAllocationQueue.becomeLeader();
taskMaster.becomeHalfLeader(taskRunner, taskQueue);
}
Expand All @@ -150,6 +153,7 @@ public void stop()
{
taskMaster.stopBeingLeader();
segmentAllocationQueue.stopBeingLeader();
segmentMetadataCache.stopBeingLeader();
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.util.HashSet;
Expand All @@ -42,15 +42,15 @@ public class RetrieveSegmentsActionsTest
private static final String UNUSED_V0 = "v0";
private static final String UNUSED_V1 = "v1";

@ClassRule
public static TaskActionTestKit actionTestKit = new TaskActionTestKit();
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();

private static Task task;
private static Set<DataSegment> expectedUnusedSegments;
private static Set<DataSegment> expectedUsedSegments;

@BeforeClass
public static void setup()
@Before
public void setup()
{
task = NoopTask.create();

Expand Down Expand Up @@ -99,6 +99,7 @@ private static DataSegment createSegment(Interval interval, String version)
@Test
public void testRetrieveUsedSegmentsAction()
{
actionTestKit.syncSegmentMetadataCache();
final RetrieveUsedSegmentsAction action =
new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL));
final Set<DataSegment> observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -93,28 +94,32 @@ public class SegmentAllocateActionTest

private SegmentAllocationQueue allocationQueue;

@Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}")
@Parameterized.Parameters(name = "lock={0}, useBatch={1}, useSegmentCache={2}, reduceMetadataIO={3}")
public static Iterable<Object[]> constructorFeeder()
{
// reduceMetadataIO is applicable only with batch allocation
return ImmutableList.of(
new Object[]{LockGranularity.SEGMENT, true, true},
new Object[]{LockGranularity.SEGMENT, true, false},
new Object[]{LockGranularity.SEGMENT, false, false},
new Object[]{LockGranularity.TIME_CHUNK, true, true},
new Object[]{LockGranularity.TIME_CHUNK, true, false},
new Object[]{LockGranularity.TIME_CHUNK, false, false}
new Object[]{LockGranularity.SEGMENT, true, true, true},
new Object[]{LockGranularity.SEGMENT, true, false, false},
new Object[]{LockGranularity.SEGMENT, false, false, false},
new Object[]{LockGranularity.TIME_CHUNK, true, true, true},
new Object[]{LockGranularity.TIME_CHUNK, true, false, false},
new Object[]{LockGranularity.TIME_CHUNK, false, false, false},
new Object[]{LockGranularity.TIME_CHUNK, false, true, false}
);
}

public SegmentAllocateActionTest(
LockGranularity lockGranularity,
boolean useBatch,
boolean useSegmentMetadataCache,
boolean skipSegmentPayloadFetchForAllocation
)
{
this.lockGranularity = lockGranularity;
this.useBatch = useBatch;
this.taskActionTestKit.setSkipSegmentPayloadFetchForAllocation(skipSegmentPayloadFetchForAllocation);
this.taskActionTestKit.setUseSegmentMetadataCache(useSegmentMetadataCache);
}

@Before
Expand All @@ -141,9 +146,7 @@ public void tearDown()
@Test
public void testManySegmentsSameInterval_noLineageCheck() throws Exception
{
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}
Assume.assumeTrue(lockGranularity == LockGranularity.TIME_CHUNK);

final Task task = NoopTask.create();
final int numTasks = 2;
Expand Down Expand Up @@ -1170,7 +1173,7 @@ public void testSegmentIdMustNotBeReused()
// Allocate another id and ensure that it doesn't exist in the druid_segments table
final SegmentIdWithShardSpec theId =
allocate(task1, DateTimes.nowUtc(), Granularities.NONE, Granularities.ALL, "seq", "3");
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
Assert.assertNull(coordinator.retrieveSegmentForId(theId.getDataSource(), theId.asSegmentId().toString()));

lockbox.unlock(task1, Intervals.ETERNITY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,22 @@ public class SegmentAllocationQueueTest

private final boolean reduceMetadataIO;

@Parameterized.Parameters(name = "reduceMetadataIO = {0}")
@Parameterized.Parameters(name = "reduceMetadataIO = {0}, useSegmentCache = {1}")
public static Object[][] getTestParameters()
{
return new Object[][]{{true}, {false}};
return new Object[][]{
{true, true},
{true, false},
{false, true},
{false, false}
};
}

public SegmentAllocationQueueTest(boolean reduceMetadataIO)
public SegmentAllocationQueueTest(boolean reduceMetadataIO, boolean useSegmentMetadataCache)
{
this.reduceMetadataIO = reduceMetadataIO;

taskActionTestKit.setUseSegmentMetadataCache(useSegmentMetadataCache);
}

@Before
Expand Down
Loading