Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1175,8 +1175,8 @@ The following properties pertain to segment metadata caching on the Overlord tha

|Property|Description|Default|
|--------|-----------|-------|
|`druid.manager.segments.useCache`|If `true`, segment metadata is cached on the Overlord to speed up metadata operations.|false|
|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is `true`.|`PT1M` (1 minute)|
|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`|
|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `always` or `ifSynced`.|`PT1M` (1 minute)|

#### Overlord dynamic configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -113,7 +114,8 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public void setUp() throws Exception
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ private static DataSegment createSegment(Interval interval, String version)
@Test
public void testRetrieveUsedSegmentsAction()
{
actionTestKit.syncSegmentMetadataCache();
final RetrieveUsedSegmentsAction action =
new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL));
final Set<DataSegment> observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class TaskActionTestKit extends ExternalResource
private SegmentMetadataCache segmentMetadataCache;
private BlockingExecutorService metadataCachePollExec;

private boolean useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache();
private boolean useSegmentMetadataCache = false;
private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO();

public TaskLockbox getTaskLockbox()
Expand Down Expand Up @@ -182,13 +182,17 @@ public boolean isBatchAllocationReduceMetadataIO()
private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMapper objectMapper)
{
metadataCachePollExec = new BlockingExecutorService("test-cache-poll-exec");
SegmentMetadataCache.UsageMode cacheMode
= useSegmentMetadataCache
? SegmentMetadataCache.UsageMode.ALWAYS
: SegmentMetadataCache.UsageMode.NEVER;
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), useSegmentMetadataCache)),
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector,
(poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false),
new NoopServiceEmitter()
NoopServiceEmitter.instance()
);

final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
Expand All @@ -199,7 +203,8 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe
metadataStorageTablesConfig,
testDerbyConnector,
leaderSelector,
segmentMetadataCache
segmentMetadataCache,
NoopServiceEmitter.instance()
)
{
@Override
Expand All @@ -222,6 +227,6 @@ public void after()
taskActionToolbox = null;
segmentMetadataCache.stopBeingLeader();
segmentMetadataCache.stop();
useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache();
useSegmentMetadataCache = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,13 @@ public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorMan

private SqlSegmentMetadataTransactionFactory createTransactionFactory()
{
final SegmentMetadataCache.UsageMode cacheMode
= useSegmentMetadataCache
? SegmentMetadataCache.UsageMode.ALWAYS
: SegmentMetadataCache.UsageMode.NEVER;
segmentMetadataCache = new HeapMemorySegmentMetadataCache(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), useSegmentMetadataCache)),
Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)),
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
ScheduledExecutors::fixed,
Expand All @@ -332,7 +336,8 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
leaderSelector,
segmentMetadataCache
segmentMetadataCache,
NoopServiceEmitter.instance()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -88,7 +89,8 @@ public void setup()
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec;
Expand Down Expand Up @@ -136,7 +137,8 @@ public void setup()
tablesConfig,
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
objectMapper,
tablesConfig,
Expand Down Expand Up @@ -477,7 +479,8 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded()
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
loadedMapper,
derby.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void setUp()
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnectorRule.getConnector(),
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
Expand Down Expand Up @@ -595,7 +596,8 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b
derby.metadataTablesConfigSupplier().get(),
derbyConnector,
new TestDruidLeaderSelector(),
new NoopSegmentMetadataCache()
NoopSegmentMetadataCache.instance(),
NoopServiceEmitter.instance()
),
objectMapper,
derby.metadataTablesConfigSupplier().get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ druid_indexer_runner_type=remote
druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord
druid_server_https_crlPath=/tls/revocations.crl

druid_segments_manager_pollDuration=PT5S
druid_segments_manager_useCache=true
druid_manager_segments_pollDuration=PT5S
druid_manager_segments_useCache=always
4 changes: 2 additions & 2 deletions integration-tests/docker/environment-configs/overlord
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ druid_indexer_storage_type=metadata
druid_indexer_runner_type=remote
druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord
druid_server_https_crlPath=/tls/revocations.crl
druid_segments_manager_pollDuration=PT2S
druid_segments_manager_useCache=true
druid_manager_segments_pollDuration=PT5S
druid_manager_segments_useCache=always
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private Set<DataSegment> doRetrieveUsedSegments(
final Segments visibility
)
{
return inReadWriteDatasourceTransaction(
return inReadOnlyDatasourceTransaction(
dataSource,
transaction -> {
if (visibility == Segments.ONLY_VISIBLE) {
Expand Down Expand Up @@ -283,7 +283,7 @@ public int markAllSegmentsAsUnused(String dataSource)
}

private SegmentTimeline getTimelineForIntervals(
final SegmentMetadataTransaction transaction,
final SegmentMetadataReadTransaction transaction,
final List<Interval> intervals
)
{
Expand Down Expand Up @@ -1500,6 +1500,11 @@ private SegmentIdWithShardSpec getTrueAllocatedId(
allocatedId.getInterval(),
allocatedId.getVersion()
);
log.info(
"Allocated SegmentId[%s] is already in use. Using next ID after max[%s].",
allocatedId.asSegmentId(), unusedMaxId
);

// No unused segment. Just return the allocated id
if (unusedMaxId == null) {
return allocatedId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.joda.time.Period;

/**
Expand All @@ -36,19 +37,19 @@ public class SegmentsMetadataManagerConfig
private final Period pollDuration;

@JsonProperty
private final boolean useCache;
private final SegmentMetadataCache.UsageMode useCache;

@JsonCreator
public SegmentsMetadataManagerConfig(
@JsonProperty("pollDuration") Period pollDuration,
@JsonProperty("useCache") Boolean useCache
@JsonProperty("useCache") SegmentMetadataCache.UsageMode useCache
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1));
this.useCache = Configs.valueOrDefault(useCache, false);
this.useCache = Configs.valueOrDefault(useCache, SegmentMetadataCache.UsageMode.NEVER);
}

public boolean isUseCache()
public SegmentMetadataCache.UsageMode getCacheMode()
{
return useCache;
}
Expand Down
Loading
Loading