From 2f1da9de8a07e28703328ed28daf5426806839a9 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 9 Nov 2023 19:17:18 +0530 Subject: [PATCH 1/2] Fix schema refresh logic --- .../CoordinatorSegmentMetadataCache.java | 3 +- .../CoordinatorSegmentMetadataCacheTest.java | 136 +++++++++++++++++ .../metadata/SegmentMetadataCacheCommon.java | 2 +- .../schema/BrokerSegmentMetadataCache.java | 2 +- .../BrokerSegmentMetadataCacheTest.java | 138 ++++++++++++++++++ 5 files changed, 278 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 960921c9e93b..8aa5a90739de 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -136,8 +136,9 @@ public void refresh(final Set segmentsToRefresh, final Set da if (rowSignature == null) { log.info("RowSignature null for dataSource [%s], implying it no longer exists, all metadata removed.", dataSource); tables.remove(dataSource); - return; + continue; } + DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature); final DataSourceInformation oldTable = tables.put(dataSource, druidTable); if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 22a5d7a67c41..aa6cc0cb88fd 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -27,6 +28,7 @@ import com.google.common.collect.Sets; import org.apache.druid.client.DruidServer; import org.apache.druid.client.InternalQueryConfig; +import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -34,14 +36,21 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryResponse; @@ -53,16 +62,19 @@ import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -361,6 +373,130 @@ public void testNullDatasource() throws IOException, InterruptedException Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } + @Test + public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException + { + CountDownLatch addSegmentLatch = new CountDownLatch(7); + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter() + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + addSegmentLatch.countDown(); + } + + @Override + public void removeSegment(final DataSegment segment) + { + super.removeSegment(segment); + } + + @Override + public void markDataSourceAsNeedRebuild(String datasource) + { + super.markDataSourceAsNeedRebuild(datasource); + markDataSourceLatch.countDown(); + } + + @Override + @VisibleForTesting + public void refresh( + final Set segmentsToRefresh, + final Set dataSourcesToRebuild) throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + } + }; + + schema.start(); + schema.awaitInitialization(); + + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + Assert.assertEquals(6, segments.size()); + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(segmentToRemove); + schema.removeSegment(segmentToRemove); + + // we will add a segment to another datasource and + // check if columns for this segment is reflected in the datasoruce schema + DataSegment newSegment = + DataSegment.builder() + .dataSource(DATASOURCE1) + .interval(Intervals.of("2002/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + final File tmpDir = temporaryFolder.newFolder(); + + List rows = ImmutableList.of( + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim2", "x")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim2", "x")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim2", "x")) + ); + + QueryableIndex index = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(rows) + .buildMMappedIndex(); + + walker.add(newSegment, index); + serverView.addSegment(newSegment, ServerType.HISTORICAL); + + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); + dataSources.remove("foo2"); + + // LinkedHashSet to ensure we encounter the remove datasource first + Set dataSourcesToRefresh = new LinkedHashSet<>(); + dataSourcesToRefresh.add("foo2"); + dataSourcesToRefresh.addAll(dataSources); + + segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); + Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); + + final DataSourceInformation fooDs = schema.getDatasource("foo"); + + // check if the new column present in the added segment is present in the datasource schema + // ensuring that the schema is rebuilt + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim2"::equals)); + } + @Test public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java index d421a15e35fc..fb7b87580e1f 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java @@ -286,7 +286,7 @@ public void tearDown() throws Exception resourceCloser.close(); } - InputRow createRow(final ImmutableMap map) + public InputRow createRow(final ImmutableMap map) { return MapInputRowParser.parse(FOO_SCHEMA, (Map) map); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index a52c6d892595..21c4a50996b7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -201,7 +201,7 @@ public void refresh(final Set segmentsToRefresh, final Set da if (rowSignature == null) { log.info("datasource [%s] no longer exists, all metadata removed.", dataSource); tables.remove(dataSource); - return; + continue; } final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 3cc566cfc1b0..448ef0801b34 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -36,6 +36,7 @@ import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; +import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -44,15 +45,22 @@ import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; import org.apache.druid.segment.metadata.DataSourceInformation; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryResponse; @@ -67,6 +75,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.junit.After; @@ -76,12 +85,14 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -575,6 +586,133 @@ public void testNullDatasource() throws IOException, InterruptedException Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } + @Test + public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException + { + CountDownLatch addSegmentLatch = new CountDownLatch(7); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), + new NoopCoordinatorClient() + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + addSegmentLatch.countDown(); + } + + @Override + public void removeSegment(final DataSegment segment) + { + super.removeSegment(segment); + } + + @Override + public void markDataSourceAsNeedRebuild(String datasource) + { + super.markDataSourceAsNeedRebuild(datasource); + markDataSourceLatch.countDown(); + } + + @Override + @VisibleForTesting + public void refresh( + final Set segmentsToRefresh, + final Set dataSourcesToRebuild) throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + refreshLatch.countDown(); + } + }; + + schema.start(); + schema.awaitInitialization(); + + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + Assert.assertEquals(6, segments.size()); + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(segmentToRemove); + schema.removeSegment(segmentToRemove); + + // we will add a segment to another datasource and + // check if columns for this segment is reflected in the datasoruce schema + DataSegment newSegment = + DataSegment.builder() + .dataSource(DATASOURCE1) + .interval(Intervals.of("2002/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + final File tmpDir = temporaryFolder.newFolder(); + + List rows = ImmutableList.of( + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim2", "x")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim2", "x")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim2", "x")) + ); + + QueryableIndex index = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(rows) + .buildMMappedIndex(); + + walker.add(newSegment, index); + serverView.addSegment(newSegment, ServerType.HISTORICAL); + + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); + dataSources.remove("foo2"); + + // LinkedHashSet to ensure we encounter the remove datasource first + Set dataSourcesToRefresh = new LinkedHashSet<>(); + dataSourcesToRefresh.add("foo2"); + dataSourcesToRefresh.addAll(dataSources); + + segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); + Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); + + final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + + // check if the new column present in the added segment is present in the datasource schema + // ensuring that the schema is rebuilt + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim2"::equals)); + } + @Test public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException { From de5f3ef8815afe321cfffa898ac7a0f7efe05d4c Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 10 Nov 2023 16:00:17 +0530 Subject: [PATCH 2/2] Update UT --- .../CoordinatorSegmentMetadataCache.java | 2 +- .../CoordinatorSegmentMetadataCacheTest.java | 19 +++++++----- .../BrokerSegmentMetadataCacheTest.java | 31 ++++++++++--------- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 8aa5a90739de..1badb2383d44 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -134,7 +134,7 @@ public void refresh(final Set segmentsToRefresh, final Set da for (String dataSource : dataSourcesToRebuild) { final RowSignature rowSignature = buildDataSourceRowSignature(dataSource); if (rowSignature == null) { - log.info("RowSignature null for dataSource [%s], implying it no longer exists, all metadata removed.", dataSource); + log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource); tables.remove(dataSource); continue; } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index aa6cc0cb88fd..31176a17f192 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -425,6 +425,11 @@ public void refresh( .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); Assert.assertEquals(6, segments.size()); + + // verify that dim3 column isn't present in schema for datasource foo + DataSourceInformation fooDs = schema.getDatasource("foo"); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals)); + // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" final DataSegment segmentToRemove = segments.stream() @@ -435,7 +440,7 @@ public void refresh( schema.removeSegment(segmentToRemove); // we will add a segment to another datasource and - // check if columns for this segment is reflected in the datasoruce schema + // check if columns in this segment is reflected in the datasource schema DataSegment newSegment = DataSegment.builder() .dataSource(DATASOURCE1) @@ -448,9 +453,9 @@ public void refresh( final File tmpDir = temporaryFolder.newFolder(); List rows = ImmutableList.of( - createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim2", "x")), - createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim2", "x")), - createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim2", "x")) + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3")) ); QueryableIndex index = IndexBuilder.create() @@ -482,7 +487,7 @@ public void refresh( dataSourcesToRefresh.add("foo2"); dataSourcesToRefresh.addAll(dataSources); - segments = segmentMetadatas.values() + segments = schema.getSegmentMetadataSnapshot().values() .stream() .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); @@ -490,11 +495,11 @@ public void refresh( schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); - final DataSourceInformation fooDs = schema.getDatasource("foo"); + fooDs = schema.getDatasource("foo"); // check if the new column present in the added segment is present in the datasource schema // ensuring that the schema is rebuilt - Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim2"::equals)); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); } @Test diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 448ef0801b34..dd29082b8587 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -618,7 +618,6 @@ public void removeSegment(final DataSegment segment) public void markDataSourceAsNeedRebuild(String datasource) { super.markDataSourceAsNeedRebuild(datasource); - markDataSourceLatch.countDown(); } @Override @@ -628,7 +627,6 @@ public void refresh( final Set dataSourcesToRebuild) throws IOException { super.refresh(segmentsToRefresh, dataSourcesToRebuild); - refreshLatch.countDown(); } }; @@ -641,6 +639,11 @@ public void refresh( .map(AvailableSegmentMetadata::getSegment) .collect(Collectors.toList()); Assert.assertEquals(6, segments.size()); + + // verify that dim3 column isn't present in the schema for foo + DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals)); + // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" final DataSegment segmentToRemove = segments.stream() @@ -651,10 +654,10 @@ public void refresh( schema.removeSegment(segmentToRemove); // we will add a segment to another datasource and - // check if columns for this segment is reflected in the datasoruce schema + // check if columns in this segment is reflected in the datasource schema DataSegment newSegment = DataSegment.builder() - .dataSource(DATASOURCE1) + .dataSource("foo") .interval(Intervals.of("2002/P1Y")) .version("1") .shardSpec(new LinearShardSpec(0)) @@ -664,9 +667,9 @@ public void refresh( final File tmpDir = temporaryFolder.newFolder(); List rows = ImmutableList.of( - createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim2", "x")), - createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim2", "x")), - createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim2", "x")) + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3")) ); QueryableIndex index = IndexBuilder.create() @@ -693,24 +696,24 @@ public void refresh( Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); dataSources.remove("foo2"); - // LinkedHashSet to ensure we encounter the remove datasource first + // LinkedHashSet to ensure that the datasource with no segments is encountered first Set dataSourcesToRefresh = new LinkedHashSet<>(); dataSourcesToRefresh.add("foo2"); dataSourcesToRefresh.addAll(dataSources); - segments = segmentMetadatas.values() - .stream() - .map(AvailableSegmentMetadata::getSegment) - .collect(Collectors.toList()); + segments = schema.getSegmentMetadataSnapshot().values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); - final DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + fooDs = schema.getDatasource("foo"); // check if the new column present in the added segment is present in the datasource schema // ensuring that the schema is rebuilt - Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim2"::equals)); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); } @Test