Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {
log.info("RowSignature null for dataSource [%s], implying it no longer exists, all metadata removed.", dataSource);
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
return;
continue;
}

DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature);
final DataSourceInformation oldTable = tables.put(dataSource, druidTable);
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,37 @@
package org.apache.druid.segment.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
Expand All @@ -53,16 +62,19 @@
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -361,6 +373,135 @@ public void testNullDatasource() throws IOException, InterruptedException
Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
}

@Test
public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException
{
CountDownLatch addSegmentLatch = new CountDownLatch(7);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter()
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
addSegmentLatch.countDown();
}

@Override
public void removeSegment(final DataSegment segment)
{
super.removeSegment(segment);
}

@Override
public void markDataSourceAsNeedRebuild(String datasource)
{
super.markDataSourceAsNeedRebuild(datasource);
markDataSourceLatch.countDown();
}

@Override
@VisibleForTesting
public void refresh(
final Set<SegmentId> segmentsToRefresh,
final Set<String> dataSourcesToRebuild) throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
}
};

schema.start();
schema.awaitInitialization();

final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
List<DataSegment> segments = segmentMetadatas.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
Assert.assertEquals(6, segments.size());

// verify that dim3 column isn't present in schema for datasource foo
DataSourceInformation fooDs = schema.getDatasource("foo");
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));

// segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove);

// we will add a segment to another datasource and
// check if columns in this segment is reflected in the datasource schema
DataSegment newSegment =
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(Intervals.of("2002/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();

final File tmpDir = temporaryFolder.newFolder();

List<InputRow> rows = ImmutableList.of(
createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")),
createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")),
createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3"))
);

QueryableIndex index = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build()
)
.rows(rows)
.buildMMappedIndex();

walker.add(newSegment, index);
serverView.addSegment(newSegment, ServerType.HISTORICAL);

Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));

Set<String> dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
dataSources.remove("foo2");

// LinkedHashSet to ensure we encounter the remove datasource first
Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
dataSourcesToRefresh.add("foo2");
dataSourcesToRefresh.addAll(dataSources);

segments = schema.getSegmentMetadataSnapshot().values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());

schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh);
Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());

fooDs = schema.getDatasource("foo");

// check if the new column present in the added segment is present in the datasource schema
// ensuring that the schema is rebuilt
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
}

@Test
public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void tearDown() throws Exception
resourceCloser.close();
}

InputRow createRow(final ImmutableMap<String, ?> map)
public InputRow createRow(final ImmutableMap<String, ?> map)
{
return MapInputRowParser.parse(FOO_SCHEMA, (Map<String, Object>) map);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
if (rowSignature == null) {
log.info("datasource [%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
return;
continue;
}

final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature);
Expand Down
Loading