diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 2929229b4c59..4aaa95e12a63 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.amazonaws.annotation.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -63,7 +64,6 @@ import org.apache.druid.sql.calcite.view.DruidViewMacro; import org.apache.druid.sql.calcite.view.ViewManager; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; import java.io.IOException; import java.util.Comparator; @@ -95,8 +95,9 @@ public class DruidSchema extends AbstractSchema private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long IS_PUBLISHED = 0; - private static final long IS_AVAILABLE = 1; + private static final long DEFAULT_IS_PUBLISHED = 0; + private static final long DEFAULT_IS_AVAILABLE = 1; + private static final long DEFAULT_NUM_ROWS = 0; private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; @@ -107,12 +108,12 @@ public class DruidSchema extends AbstractSchema // For awaitInitialization. private final CountDownLatch initialized = new CountDownLatch(1); - // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized + // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata private final Object lock = new Object(); // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. - // This data structure need to be accessed in a thread-safe way since SystemSchema accesses it + @GuardedBy("lock") private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; @@ -351,7 +352,8 @@ protected Multimap getFunctionMultim return builder.build(); } - private void addSegment(final DruidServerMetadata server, final DataSegment segment) + @VisibleForTesting + void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); @@ -360,16 +362,18 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; - final Map> serverSegmentMap = ImmutableMap.of( + final Set servers = ImmutableSet.of(server.getName()); + holder = SegmentMetadataHolder.builder( segment.getId(), - ImmutableSet.of(server.getName()) - ); - - holder = SegmentMetadataHolder - .builder(segment.getId(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, serverSegmentMap) - .build(); + DEFAULT_IS_PUBLISHED, + DEFAULT_IS_AVAILABLE, + isRealtime, + servers, + null, + DEFAULT_NUM_ROWS + ).build(); // Unknown segment. - setSegmentSignature(segment, holder); + setSegmentMetadataHolder(segment, holder); segmentsNeedingRefresh.add(segment); if (!server.segmentReplicatable()) { log.debug("Added new mutable segment[%s].", segment.getId()); @@ -378,14 +382,14 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm log.debug("Added new immutable segment[%s].", segment.getId()); } } else { - final Map> segmentServerMap = holder.getReplicas(); + final Set segmentServers = holder.getReplicas(); final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServerMap.get(segment.getId())) + .addAll(segmentServers) .add(server.getName()) .build(); final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder .from(holder) - .withReplicas(ImmutableMap.of(segment.getId(), servers)) + .withReplicas(servers) .build(); knownSegments.put(segment, holderWithNumReplicas); if (server.segmentReplicatable()) { @@ -404,7 +408,7 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm } @VisibleForTesting - protected void removeSegment(final DataSegment segment) + void removeSegment(final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); @@ -435,13 +439,13 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final SegmentMetadataHolder holder = knownSegments.get(segment); - final Map> segmentServerMap = holder.getReplicas(); - final ImmutableSet servers = FluentIterable.from(segmentServerMap.get(segment.getId())) + final Set segmentServers = holder.getReplicas(); + final ImmutableSet servers = FluentIterable.from(segmentServers) .filter(Predicates.not(Predicates.equalTo(server.getName()))) .toSet(); final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder .from(holder) - .withReplicas(ImmutableMap.of(segment.getId(), servers)) + .withReplicas(servers) .build(); knownSegments.put(segment, holderWithNumReplicas); lock.notifyAll(); @@ -453,7 +457,7 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg * which may be a subset of the asked-for set. */ @VisibleForTesting - protected Set refreshSegments(final Set segments) throws IOException + Set refreshSegments(final Set segments) throws IOException { final Set retVal = new HashSet<>(); @@ -525,7 +529,7 @@ private Set refreshSegmentsForDataSource(final String dataSource, f .withNumRows(analysis.getNumRows()) .build(); dataSourceSegments.put(segment, updatedHolder); - setSegmentSignature(segment, updatedHolder); + setSegmentMetadataHolder(segment, updatedHolder); retVal.add(segment); } } @@ -550,7 +554,8 @@ private Set refreshSegmentsForDataSource(final String dataSource, f return retVal; } - private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + @VisibleForTesting + void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java index f2d5ab313b5c..38ff92858ec0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java @@ -23,7 +23,6 @@ import org.apache.druid.timeline.SegmentId; import javax.annotation.Nullable; -import java.util.Map; import java.util.Set; /** @@ -36,15 +35,25 @@ public static Builder builder( long isPublished, long isAvailable, long isRealtime, - Map> segmentServerMap + Set segmentServers, + RowSignature rowSignature, + long numRows ) { - return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServerMap); + return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows); } public static Builder from(SegmentMetadataHolder h) { - return new Builder(h.getSegmentId(), h.isPublished(), h.isAvailable(), h.isRealtime(), h.getReplicas()); + return new Builder( + h.getSegmentId(), + h.isPublished(), + h.isAvailable(), + h.isRealtime(), + h.getReplicas(), + h.getRowSignature(), + h.getNumRows() + ); } private final SegmentId segmentId; @@ -54,8 +63,8 @@ public static Builder from(SegmentMetadataHolder h) private final long isPublished; private final long isAvailable; private final long isRealtime; - //segmentId -> set of servers that contain the segment - private final Map> segmentServerMap; + // set of servers that contain the segment + private final Set segmentServers; private final long numRows; @Nullable private final RowSignature rowSignature; @@ -66,7 +75,7 @@ private SegmentMetadataHolder(Builder builder) this.isPublished = builder.isPublished; this.isAvailable = builder.isAvailable; this.isRealtime = builder.isRealtime; - this.segmentServerMap = builder.segmentServerMap; + this.segmentServers = builder.segmentServers; this.numRows = builder.numRows; this.segmentId = builder.segmentId; } @@ -91,14 +100,14 @@ public SegmentId getSegmentId() return segmentId; } - public Map> getReplicas() + public Set getReplicas() { - return segmentServerMap; + return segmentServers; } - public long getNumReplicas(SegmentId segmentId) + public long getNumReplicas() { - return segmentServerMap.get(segmentId).size(); + return segmentServers.size(); } public long getNumRows() @@ -119,7 +128,7 @@ public static class Builder private final long isAvailable; private final long isRealtime; - private Map> segmentServerMap; + private Set segmentServers; @Nullable private RowSignature rowSignature; private long numRows; @@ -129,14 +138,18 @@ private Builder( long isPublished, long isAvailable, long isRealtime, - Map> segmentServerMap + Set servers, + RowSignature rowSignature, + long numRows ) { this.segmentId = segmentId; this.isPublished = isPublished; this.isAvailable = isAvailable; this.isRealtime = isRealtime; - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; + this.rowSignature = rowSignature; + this.numRows = numRows; } public Builder withRowSignature(RowSignature rowSignature) @@ -151,9 +164,9 @@ public Builder withNumRows(long numRows) return this; } - public Builder withReplicas(Map> segmentServerMap) + public Builder withReplicas(Set servers) { - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; return this; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index d0599f861903..e895113f8505 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -224,7 +224,7 @@ public Enumerable scan(DataContext root) Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); for (SegmentMetadataHolder h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = - new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows()); + new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows()); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 30b99e59ab44..e707c40aeb5c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -27,6 +27,8 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -40,6 +42,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.DruidTable; @@ -84,6 +87,8 @@ public class DruidSchemaTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private List druidServers; + @BeforeClass public static void setUpClass() { @@ -163,10 +168,12 @@ public void setUp() throws Exception index2 ); + final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments()); + druidServers = serverView.getDruidServers(); schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - new TestServerInventoryView(walker.getSegments()), + serverView, PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -239,6 +246,62 @@ public void testGetTableMapFoo2() Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + /** + * This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case + * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} + * is called more than once for same segment + */ + @Test + public void testSegmentMetadataHolderNumRows() + { + Map segmentsMetadata = schema.getSegmentMetadata(); + final Set segments = segmentsMetadata.keySet(); + Assert.assertEquals(3, segments.size()); + // find the only segment with datasource "foo2" + final DataSegment existingSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(existingSegment); + final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); + // update SegmentMetadataHolder of existingSegment with numRows=5 + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); + schema.setSegmentMetadataHolder(existingSegment, updatedHolder); + // find a druidServer holding existingSegment + final Pair pair = druidServers.stream() + .flatMap(druidServer -> druidServer.getSegments() + .stream() + .filter(segment -> segment + .equals( + existingSegment)) + .map(segment -> Pair + .of( + druidServer, + segment + ))) + .findAny() + .orElse(null); + Assert.assertNotNull(pair); + final ImmutableDruidServer server = pair.lhs; + Assert.assertNotNull(server); + final DruidServerMetadata druidServerMetadata = server.getMetadata(); + // invoke DruidSchema#addSegment on existingSegment + schema.addSegment(druidServerMetadata, existingSegment); + segmentsMetadata = schema.getSegmentMetadata(); + // get the only segment with datasource "foo2" + final DataSegment currentSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment); + Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId()); + Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); + // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before + Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas()); + Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable()); + Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished()); + } + @Test public void testNullDatasource() throws IOException { @@ -247,7 +310,10 @@ public void testNullDatasource() throws IOException Assert.assertEquals(segments.size(), 3); // segments contains two segments with datasource "foo" and one with datasource "foo2" // let's remove the only segment with datasource "foo2" - final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null); + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); Assert.assertFalse(segmentToRemove == null); schema.removeSegment(segmentToRemove); schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource @@ -262,8 +328,11 @@ public void testNullSegmentMetadataHolder() throws IOException Map segmentMetadatas = schema.getSegmentMetadata(); Set segments = segmentMetadatas.keySet(); Assert.assertEquals(segments.size(), 3); - //remove one of the segments with datasource "foo" - final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null); + // remove one of the segments with datasource "foo" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo")) + .findFirst() + .orElse(null); Assert.assertFalse(segmentToRemove == null); schema.removeSegment(segmentToRemove); schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 7d8cdaad5729..d354f141d9af 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -118,6 +118,11 @@ public class SystemSchemaTest extends CalciteTestBase CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0")) ); + private static final List ROWS3 = ImmutableList.of( + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "7.0", "dim3", ImmutableList.of("x"))), + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "8.0", "dim3", ImmutableList.of("xyz"))) + ); + private SystemSchema schema; private SpecificSegmentsQuerySegmentWalker walker; private DruidLeaderClient client; @@ -204,11 +209,22 @@ public Authorizer getAuthorizer(String name) ) .rows(ROWS2) .buildMMappedIndex(); + final QueryableIndex index3 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "3")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics(new LongSumAggregatorFactory("m1", "m1")) + .withRollup(false) + .build() + ) + .rows(ROWS3) + .buildMMappedIndex(); walker = new SpecificSegmentsQuerySegmentWalker(conglomerate) .add(segment1, index1) .add(segment2, index2) - .add(segment3, index2); + .add(segment3, index3); druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), @@ -469,7 +485,7 @@ public Object get(String name) 100L, 2L, //partition_num 1L, //num_replicas - 3L, //numRows + 2L, //numRows 0L, //is_published 1L, //is_available 0L //is_realtime @@ -481,7 +497,7 @@ public Object get(String name) 100L, 0L, //partition_num 1L, //num_replicas - 0L, //numRows = 3 + 0L, //numRows 0L, //is_published 1L, //is_available 1L //is_realtime diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 6718b1bd1f80..2dcc56959837 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -20,7 +20,9 @@ package org.apache.druid.sql.calcite.util; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -83,7 +86,14 @@ public TimelineLookup getTimeline(DataSource dataSource) @Override public List getDruidServers() { - throw new UnsupportedOperationException(); + final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); + final ImmutableDruidServer server = new ImmutableDruidServer( + DUMMY_SERVER, + 0L, + ImmutableMap.of("src", dataSource), + 1 + ); + return ImmutableList.of(server); } @Override