Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
Expand Down Expand Up @@ -104,16 +103,16 @@ public int getSqlMetadataMaxRetry()
}
};
taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter());
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector,
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
final ServiceEmitter noopEmitter = new NoopServiceEmitter();
final TaskLockConfig taskLockConfig = new TaskLockConfig()
{
@Override
Expand All @@ -137,10 +136,10 @@ public long getBatchAllocationWaitTime()
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
noopEmitter,
NoopServiceEmitter.instance(),
ScheduledExecutors::fixed
),
noopEmitter,
NoopServiceEmitter.instance(),
EasyMock.createMock(SupervisorManager.class),
objectMapper
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,15 @@ public void setUpIngestionTestBase() throws IOException
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter());
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
SegmentsMetadataManagerConfig::new,
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
Expand Down Expand Up @@ -55,23 +54,6 @@ public static DataSourcesSnapshot fromUsedSegments(
return new DataSourcesSnapshot(CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource));
}

public static DataSourcesSnapshot fromUsedSegmentsTimelines(
Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource,
ImmutableMap<String, String> dataSourceProperties
)
{
Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments =
Maps.newHashMapWithExpectedSize(usedSegmentsTimelinesPerDataSource.size());
usedSegmentsTimelinesPerDataSource.forEach(
(dataSourceName, usedSegmentsTimeline) -> {
DruidDataSource dataSource = new DruidDataSource(dataSourceName, dataSourceProperties);
usedSegmentsTimeline.iterateAllObjects().forEach(dataSource::addSegment);
dataSourcesWithAllUsedSegments.put(dataSourceName, dataSource.toImmutableDruidDataSource());
}
);
return new DataSourcesSnapshot(dataSourcesWithAllUsedSegments, usedSegmentsTimelinesPerDataSource);
}

private final Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments;
private final Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource;
private final ImmutableSet<DataSegment> overshadowedSegments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
Expand Down Expand Up @@ -164,6 +166,7 @@ long nanosElapsedFromInitiation()
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;
private final SegmentSchemaCache segmentSchemaCache;
private final ServiceEmitter serviceEmitter;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

/**
Expand Down Expand Up @@ -251,7 +254,8 @@ public SqlSegmentsMetadataManager(
Supplier<MetadataStorageTablesConfig> dbTables,
SQLMetadataConnector connector,
SegmentSchemaCache segmentSchemaCache,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
this.jsonMapper = jsonMapper;
Expand All @@ -260,6 +264,7 @@ public SqlSegmentsMetadataManager(
this.connector = connector;
this.segmentSchemaCache = segmentSchemaCache;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.serviceEmitter = serviceEmitter;
}

/**
Expand Down Expand Up @@ -639,7 +644,7 @@ public boolean markSegmentAsUsed(final String segmentId)
{
try {
int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
(Handle handle) -> handle
handle -> handle
.createStatement(StringUtils.format("UPDATE %s SET used=true, used_status_last_updated = :used_status_last_updated WHERE id = :id", getSegmentsTable()))
.bind("id", segmentId)
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
Expand All @@ -650,7 +655,7 @@ public boolean markSegmentAsUsed(final String segmentId)
// segment into the respective data source, because we don't have it fetched from the database. It's probably not
// worth complicating the implementation and making two database queries just to add the segment because it will
// be anyway fetched during the next poll(). Segment putting that is done in the bulk markAsUsed methods is a nice
// to have thing, but doesn't formally affects the external guarantees of SegmentsMetadataManager class.
// to have thing, but doesn't formally affect the external guarantees of SegmentsMetadataManager class.
return numUpdatedDatabaseEntries > 0;
}
catch (RuntimeException e) {
Expand Down Expand Up @@ -1031,7 +1036,6 @@ private void doPoll()
private void doPollSegments()
{
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");

// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
Expand Down Expand Up @@ -1060,22 +1064,19 @@ private void doPollSegments()
"Unexpected 'null' when polling segments from the db, aborting snapshot update."
);

if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.stop();
emitMetric("segment/poll/time", stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d]ms.",
segments.size(), stopwatch.millisElapsed()
);

createDatasourcesSnapshot(segments);
}

private void doPollSegmentAndSchema()
{
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment and schema table.");

ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentMetadataBuilder = new ImmutableMap.Builder<>();

Expand Down Expand Up @@ -1166,18 +1167,21 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
"Unexpected 'null' when polling segments from the db, aborting snapshot update."
);

if (segments.isEmpty() && schemaMap.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d] ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.stop();
emitMetric("segment/pollWithSchema/time", stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d]ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);

createDatasourcesSnapshot(segments);
}

private void emitMetric(String metricName, long value)
{
serviceEmitter.emit(new ServiceMetricEvent.Builder().setMetric(metricName, value));
}

private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
Expand All @@ -1195,8 +1199,9 @@ private void createDatasourcesSnapshot(List<DataSegment> segments)
Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
dataSourceProperties
);
log.info(
"Successfully created snapshot from polled segments in [%d] ms. Found [%d] overshadowed segments.",
emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed());
log.debug(
"Created snapshot from polled segments in [%d]ms. Found [%d] overshadowed segments.",
stopwatch.millisElapsed(), dataSourcesSnapshot.getOvershadowedSegments().size()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;

Expand All @@ -33,6 +34,7 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
private final Supplier<MetadataStorageTablesConfig> storageConfig;
private final SQLMetadataConnector connector;
private final Lifecycle lifecycle;
private final ServiceEmitter serviceEmitter;
private final SegmentSchemaCache segmentSchemaCache;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

Expand All @@ -44,14 +46,16 @@ public SqlSegmentsMetadataManagerProvider(
SQLMetadataConnector connector,
Lifecycle lifecycle,
SegmentSchemaCache segmentSchemaCache,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.storageConfig = storageConfig;
this.connector = connector;
this.lifecycle = lifecycle;
this.serviceEmitter = serviceEmitter;
this.segmentSchemaCache = segmentSchemaCache;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
Expand Down Expand Up @@ -84,7 +88,8 @@ public void stop()
storageConfig,
connector,
segmentSchemaCache,
centralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig,
serviceEmitter
);
}
}
Loading