From f7c263320bcff30fb5e8e2ba706868afb8cc3947 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 19 Jan 2019 22:51:14 -0800 Subject: [PATCH 1/7] Fix the bug with num_rows in sys.segments * Fix segmentMetadataInfo update in DruidSchema * Add numRows to SegmentMetadataHolder builder's constructor, so it's not overwritten * Rename SegSegmentSignature to setSegmentMetadataHolder and fix it so nested map is appended instead of recreated * Replace Map> segmentServerMap with Set for num_replica --- .../druid/sql/calcite/schema/DruidSchema.java | 51 +++++++++++-------- .../calcite/schema/SegmentMetadataHolder.java | 29 ++++++----- .../sql/calcite/schema/SystemSchema.java | 2 +- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index b2b2b675e809..f79dadda470b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -93,6 +93,7 @@ public class DruidSchema extends AbstractSchema private static final int MAX_SEGMENTS_PER_QUERY = 15000; private static final long IS_PUBLISHED = 0; private static final long IS_AVAILABLE = 1; + private static final long NUM_ROWS = 0; private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; @@ -108,7 +109,7 @@ public class DruidSchema extends AbstractSchema // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. - // This data structure need to be accessed in a thread-safe way since SystemSchema accesses it + // This data structure need to be accessed in a thread-safe way via lock Object private final Map> segmentMetadataInfo = new HashMap<>(); // All mutable segments. @@ -354,20 +355,19 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm // segmentReplicatable is used to determine if segments are served by realtime servers or not final long isRealtime = server.segmentReplicatable() ? 0 : 1; - final Map> serverSegmentMap = ImmutableMap.of( - segment.getIdentifier(), - ImmutableSet.of(server.getName()) - ); + final Set servers = ImmutableSet.of(server.getName()); final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder( segment.getIdentifier(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, - serverSegmentMap + servers, + null, + NUM_ROWS ).build(); // Unknown segment. - setSegmentSignature(segment, holder); + setSegmentMetadataHolder(segment, holder); segmentsNeedingRefresh.add(segment); if (!server.segmentReplicatable()) { log.debug("Added new mutable segment[%s].", segment.getIdentifier()); @@ -378,9 +378,9 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm } else { if (knownSegments.containsKey(segment)) { final SegmentMetadataHolder holder = knownSegments.get(segment); - final Map> segmentServerMap = holder.getReplicas(); + final Set segmentServers = holder.getReplicas(); final ImmutableSet servers = new ImmutableSet.Builder() - .addAll(segmentServerMap.get(segment.getIdentifier())) + .addAll(segmentServers) .add(server.getName()) .build(); final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder( @@ -388,8 +388,10 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm holder.isPublished(), holder.isAvailable(), holder.isRealtime(), - holder.getReplicas() - ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build(); + holder.getReplicas(), + holder.getRowSignature(), + holder.getNumRows() + ).withReplicas(servers).build(); knownSegments.put(segment, holderWithNumReplicas); } if (server.segmentReplicatable()) { @@ -435,8 +437,8 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg log.debug("Segment[%s] is gone from server[%s]", segment.getIdentifier(), server.getName()); final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); final SegmentMetadataHolder holder = knownSegments.get(segment); - final Map> segmentServerMap = holder.getReplicas(); - final ImmutableSet servers = FluentIterable.from(segmentServerMap.get(segment.getIdentifier())) + final Set segmentServers = holder.getReplicas(); + final ImmutableSet servers = FluentIterable.from(segmentServers) .filter(Predicates.not(Predicates.equalTo(server.getName()))) .toSet(); final SegmentMetadataHolder holderWithNumReplicas = new SegmentMetadataHolder.Builder( @@ -444,8 +446,10 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg holder.isPublished(), holder.isAvailable(), holder.isRealtime(), - holder.getReplicas() - ).withReplicas(ImmutableMap.of(segment.getIdentifier(), servers)).build(); + holder.getReplicas(), + holder.getRowSignature(), + holder.getNumRows() + ).withReplicas(servers).build(); knownSegments.put(segment, holderWithNumReplicas); lock.notifyAll(); } @@ -523,10 +527,12 @@ private Set refreshSegmentsForDataSource( holder.isPublished(), holder.isAvailable(), holder.isRealtime(), - holder.getReplicas() + holder.getReplicas(), + holder.getRowSignature(), + holder.getNumRows() ).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build(); dataSourceSegments.put(segment, updatedHolder); - setSegmentSignature(segment, updatedHolder); + setSegmentMetadataHolder(segment, updatedHolder); retVal.add(segment); } } @@ -549,11 +555,16 @@ private Set refreshSegmentsForDataSource( return retVal; } - private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + private void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { - segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER)) - .put(segment, segmentMetadataHolder); + Map innerMap = segmentMetadataInfo.get(segment.getDataSource()); + if (innerMap == null) { + segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER)) + .put(segment, segmentMetadataHolder); + } else { + innerMap.put(segment, segmentMetadataHolder); + } } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java index 34a5af7b4088..909d2dbc556d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java @@ -22,7 +22,6 @@ import org.apache.druid.sql.calcite.table.RowSignature; import javax.annotation.Nullable; -import java.util.Map; import java.util.Set; /** @@ -38,8 +37,8 @@ public class SegmentMetadataHolder private final long isAvailable; private final long isRealtime; private final String segmentId; - //segmentId -> set of servers that contain the segment - private final Map> segmentServerMap; + //set of servers that contain the segment + private final Set segmentServers; private final long numRows; @Nullable private final RowSignature rowSignature; @@ -50,7 +49,7 @@ private SegmentMetadataHolder(Builder builder) this.isPublished = builder.isPublished; this.isAvailable = builder.isAvailable; this.isRealtime = builder.isRealtime; - this.segmentServerMap = builder.segmentServerMap; + this.segmentServers = builder.segmentServers; this.numRows = builder.numRows; this.segmentId = builder.segmentId; } @@ -75,14 +74,14 @@ public String getSegmentId() return segmentId; } - public Map> getReplicas() + public Set getReplicas() { - return segmentServerMap; + return segmentServers; } - public long getNumReplicas(String segmentId) + public long getNumReplicas() { - return segmentServerMap.get(segmentId).size(); + return segmentServers.size(); } public long getNumRows() @@ -103,7 +102,7 @@ public static class Builder private final long isAvailable; private final long isRealtime; - private Map> segmentServerMap; + private Set segmentServers; @Nullable private RowSignature rowSignature; private long numRows; @@ -113,14 +112,18 @@ public Builder( long isPublished, long isAvailable, long isRealtime, - Map> segmentServerMap + Set servers, + RowSignature rowSignature, + long numRows ) { this.segmentId = segmentId; this.isPublished = isPublished; this.isAvailable = isAvailable; this.isRealtime = isRealtime; - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; + this.rowSignature = rowSignature; + this.numRows = numRows; } public Builder withRowSignature(RowSignature rowSignature) @@ -135,9 +138,9 @@ public Builder withNumRows(long numRows) return this; } - public Builder withReplicas(Map> segmentServerMap) + public Builder withReplicas(Set servers) { - this.segmentServerMap = segmentServerMap; + this.segmentServers = servers; return this; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 7a62a0b82557..57e8825af460 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -229,7 +229,7 @@ public Enumerable scan(DataContext root) final Map partialSegmentDataMap = availableSegmentMetadata.values().stream().collect( Collectors.toMap( SegmentMetadataHolder::getSegmentId, - h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows()) + h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows()) )); //get published segments from coordinator From 97721d152b194c0de4081cb7795b9859df883a55 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 24 Jan 2019 14:52:21 -0800 Subject: [PATCH 2/7] Remove unnecessary code and update test --- .../druid/sql/calcite/schema/DruidSchema.java | 14 +++++------- .../sql/calcite/schema/SystemSchemaTest.java | 22 ++++++++++++++++--- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 1f12890a6984..2e060273398d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -541,15 +541,11 @@ private Set refreshSegmentsForDataSource(final String dataSource, f private void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { - final Map innerMap = segmentMetadataInfo.get(segment.getDataSource()); - final SegmentMetadataHolder retVal; - if (innerMap == null) { - retVal = segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new TreeMap<>(SEGMENT_ORDER)) - .put(segment, segmentMetadataHolder); - } else { - retVal = innerMap.put(segment, segmentMetadataHolder); - } - if (retVal == null) { + TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( + segment.getDataSource(), + x -> new TreeMap<>(SEGMENT_ORDER) + ); + if (dataSourceSegments.put(segment, segmentMetadataHolder) == null) { totalSegments++; } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 407ff69ed955..350674817dc8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -115,6 +115,11 @@ public class SystemSchemaTest extends CalciteTestBase CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0")) ); + private static final List ROWS3 = ImmutableList.of( + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "7.0", "dim3", ImmutableList.of("x"))), + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "8.0", "dim3", ImmutableList.of("xyz"))) + ); + private SystemSchema schema; private SpecificSegmentsQuerySegmentWalker walker; private DruidLeaderClient client; @@ -200,11 +205,22 @@ public Authorizer getAuthorizer(String name) ) .rows(ROWS2) .buildMMappedIndex(); + final QueryableIndex index3 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "3")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics(new LongSumAggregatorFactory("m1", "m1")) + .withRollup(false) + .build() + ) + .rows(ROWS3) + .buildMMappedIndex(); walker = new SpecificSegmentsQuerySegmentWalker(conglomerate) .add(segment1, index1) .add(segment2, index2) - .add(segment3, index2); + .add(segment3, index3); druidSchema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), @@ -523,7 +539,7 @@ public Object get(String name) 100L, 2L, //partition_num 1L, //num_replicas - 3L, //numRows + 2L, //numRows 0L, //is_published 1L, //is_available 0L //is_realtime @@ -535,7 +551,7 @@ public Object get(String name) 100L, 0L, //partition_num 1L, //num_replicas - 0L, //numRows = 3 + 0L, //numRows 0L, //is_published 1L, //is_available 1L //is_realtime From b27a3d496431ee5d0d8fe8936105b45dbfabbeb0 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 25 Jan 2019 20:54:16 -0800 Subject: [PATCH 3/7] Add unit test for num_rows --- .../druid/sql/calcite/schema/DruidSchema.java | 7 +++- .../sql/calcite/schema/DruidSchemaTest.java | 39 ++++++++++++++++++- .../calcite/util/TestServerInventoryView.java | 12 +++++- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 2e060273398d..f7ddfb44c5b0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; @@ -350,7 +351,8 @@ protected Multimap getFunctionMultim return builder.build(); } - private void addSegment(final DruidServerMetadata server, final DataSegment segment) + @VisibleForTesting + protected void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); @@ -538,7 +540,8 @@ private Set refreshSegmentsForDataSource(final String dataSource, f return retVal; } - private void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + @VisibleForTesting + protected void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 1b4e00819bb6..d6059584e4b9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -27,6 +27,8 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -40,6 +42,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.table.DruidTable; @@ -63,6 +66,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; public class DruidSchemaTest extends CalciteTestBase { @@ -83,6 +87,8 @@ public class DruidSchemaTest extends CalciteTestBase private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private List druidServers; + @BeforeClass public static void setUpClass() { @@ -162,10 +168,12 @@ public void setUp() throws Exception index2 ); + final TimelineServerView serverView = new TestServerInventoryView(walker.getSegments()); + druidServers = serverView.getDruidServers(); schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), - new TestServerInventoryView(walker.getSegments()), + serverView, PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -237,4 +245,33 @@ public void testGetTableMapFoo2() Assert.assertEquals("m1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + + @Test + public void testSegmentMetadataHolderNumRows() + { + Map segmentsMetadata = schema.getSegmentMetadata(); + Set segments = segmentsMetadata.keySet(); + Assert.assertEquals(segments.size(), 3); + DataSegment existingSegment = segments.stream().findFirst().orElse(null); + Assert.assertFalse(existingSegment == null); + SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); + schema.setSegmentMetadataHolder(existingSegment, updatedHolder); + ImmutableDruidServer server = null; + for (ImmutableDruidServer druidServer : druidServers) { + for (DataSegment segment : druidServer.getSegments()) { + if (segment == existingSegment) { + server = druidServer; + } + } + } + Assert.assertFalse(server == null); + final DruidServerMetadata druidServerMetadata = server.getMetadata(); + schema.addSegment(druidServerMetadata, existingSegment); + segmentsMetadata = schema.getSegmentMetadata(); + existingSegment = segments.stream().findFirst().orElse(null); + final SegmentMetadataHolder currentHolder = segmentsMetadata.get(existingSegment); + Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); + } + } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java index 6718b1bd1f80..2dcc56959837 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java @@ -20,7 +20,9 @@ package org.apache.druid.sql.calcite.util; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.ServerSelector; @@ -33,6 +35,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -83,7 +86,14 @@ public TimelineLookup getTimeline(DataSource dataSource) @Override public List getDruidServers() { - throw new UnsupportedOperationException(); + final ImmutableDruidDataSource dataSource = new ImmutableDruidDataSource("DUMMY", Collections.emptyMap(), segments); + final ImmutableDruidServer server = new ImmutableDruidServer( + DUMMY_SERVER, + 0L, + ImmutableMap.of("src", dataSource), + 1 + ); + return ImmutableList.of(server); } @Override From 57778a1a9ccba06dd96ea752feb9b11fc78d87a5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 7 Feb 2019 10:16:24 -0800 Subject: [PATCH 4/7] PR comments --- .../druid/sql/calcite/schema/DruidSchema.java | 5 +- .../sql/calcite/schema/DruidSchemaTest.java | 60 ++++++++++++++----- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 20f3e47c2745..b45cf981c86d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -107,12 +107,13 @@ public class DruidSchema extends AbstractSchema // For awaitInitialization. private final CountDownLatch initialized = new CountDownLatch(1); - // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized + // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata private final Object lock = new Object(); // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. - // This data structure need to be accessed in a thread-safe way via lock Object + // This data structure need to be accessed in a thread-safe way via lock Object since segments can be added, + //removed, refreshed or accessed asynchronously private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 4ea32606d19d..3e8b6578f78c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -246,32 +246,60 @@ public void testGetTableMapFoo2() Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + /** + * This tests that {@link SegmentMetadataHolder#getNumRows()} is correct in case + * of multiple replicas i.e. when {@link DruidSchema#addSegment(DruidServerMetadata, DataSegment)} + * is called more than once for same segment + */ @Test public void testSegmentMetadataHolderNumRows() { Map segmentsMetadata = schema.getSegmentMetadata(); - Set segments = segmentsMetadata.keySet(); - Assert.assertEquals(segments.size(), 3); - DataSegment existingSegment = segments.stream().findFirst().orElse(null); - Assert.assertFalse(existingSegment == null); - SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); + final Set segments = segmentsMetadata.keySet(); + Assert.assertEquals(3, segments.size()); + // find the only segment with datasource "foo2" + final DataSegment existingSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(existingSegment); + final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); + //update SegmentMetadataHolder of existingSegment with numRows=5 SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); schema.setSegmentMetadataHolder(existingSegment, updatedHolder); - ImmutableDruidServer server = null; - for (ImmutableDruidServer druidServer : druidServers) { - for (DataSegment segment : druidServer.getSegments()) { - if (segment == existingSegment) { - server = druidServer; - } - } - } - Assert.assertFalse(server == null); + //find a druidServer holding existingSegment + final Pair pair = druidServers.stream() + .flatMap(druidServer -> druidServer.getSegments() + .stream() + .filter(segment -> segment + .equals( + existingSegment)) + .map(segment -> Pair + .of( + druidServer, + segment + ))) + .findAny() + .orElse(null); + Assert.assertNotNull(pair); + final ImmutableDruidServer server = pair.lhs; + Assert.assertNotNull(server); final DruidServerMetadata druidServerMetadata = server.getMetadata(); + //invoke DruidSchema#addSegment on existingSegment schema.addSegment(druidServerMetadata, existingSegment); segmentsMetadata = schema.getSegmentMetadata(); - existingSegment = segments.stream().findFirst().orElse(null); - final SegmentMetadataHolder currentHolder = segmentsMetadata.get(existingSegment); + // get the only segment with datasource "foo2" + final DataSegment currentSegment = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment); + Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId()); Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); + //numreplicas do not change here since we addSegment with the same server which was serving existingSegment before + Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas()); + Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable()); + Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished()); } @Test From 8eb2ac09dbcbce9c2f47fce0ebf893391169e20b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 7 Feb 2019 23:05:02 -0800 Subject: [PATCH 5/7] change access modifier to default package level --- .../org/apache/druid/sql/calcite/schema/DruidSchema.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index b45cf981c86d..c764042a919d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -353,7 +353,7 @@ protected Multimap getFunctionMultim } @VisibleForTesting - protected void addSegment(final DruidServerMetadata server, final DataSegment segment) + void addSegment(final DruidServerMetadata server, final DataSegment segment) { synchronized (lock) { final Map knownSegments = segmentMetadataInfo.get(segment.getDataSource()); @@ -408,7 +408,7 @@ protected void addSegment(final DruidServerMetadata server, final DataSegment se } @VisibleForTesting - protected void removeSegment(final DataSegment segment) + void removeSegment(final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); @@ -457,7 +457,7 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg * which may be a subset of the asked-for set. */ @VisibleForTesting - protected Set refreshSegments(final Set segments) throws IOException + Set refreshSegments(final Set segments) throws IOException { final Set retVal = new HashSet<>(); @@ -555,7 +555,7 @@ private Set refreshSegmentsForDataSource(final String dataSource, f } @VisibleForTesting - protected void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) + void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder) { synchronized (lock) { TreeMap dataSourceSegments = segmentMetadataInfo.computeIfAbsent( From e29bedbac57f82a68c8caae14f16247b24e24816 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 8 Feb 2019 15:53:13 -0800 Subject: [PATCH 6/7] minor changes to comments --- .../apache/druid/sql/calcite/schema/DruidSchema.java | 2 +- .../sql/calcite/schema/SegmentMetadataHolder.java | 2 +- .../druid/sql/calcite/schema/DruidSchemaTest.java | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index c764042a919d..1777a412cb6d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -113,7 +113,7 @@ public class DruidSchema extends AbstractSchema // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. // This data structure need to be accessed in a thread-safe way via lock Object since segments can be added, - //removed, refreshed or accessed asynchronously + // removed, refreshed or accessed asynchronously private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java index 41f8ce8458a2..38ff92858ec0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java @@ -63,7 +63,7 @@ public static Builder from(SegmentMetadataHolder h) private final long isPublished; private final long isAvailable; private final long isRealtime; - //set of servers that contain the segment + // set of servers that contain the segment private final Set segmentServers; private final long numRows; @Nullable diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 3e8b6578f78c..e707c40aeb5c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -264,10 +264,10 @@ public void testSegmentMetadataHolderNumRows() .orElse(null); Assert.assertNotNull(existingSegment); final SegmentMetadataHolder existingHolder = segmentsMetadata.get(existingSegment); - //update SegmentMetadataHolder of existingSegment with numRows=5 + // update SegmentMetadataHolder of existingSegment with numRows=5 SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build(); schema.setSegmentMetadataHolder(existingSegment, updatedHolder); - //find a druidServer holding existingSegment + // find a druidServer holding existingSegment final Pair pair = druidServers.stream() .flatMap(druidServer -> druidServer.getSegments() .stream() @@ -285,7 +285,7 @@ public void testSegmentMetadataHolderNumRows() final ImmutableDruidServer server = pair.lhs; Assert.assertNotNull(server); final DruidServerMetadata druidServerMetadata = server.getMetadata(); - //invoke DruidSchema#addSegment on existingSegment + // invoke DruidSchema#addSegment on existingSegment schema.addSegment(druidServerMetadata, existingSegment); segmentsMetadata = schema.getSegmentMetadata(); // get the only segment with datasource "foo2" @@ -296,7 +296,7 @@ public void testSegmentMetadataHolderNumRows() final SegmentMetadataHolder currentHolder = segmentsMetadata.get(currentSegment); Assert.assertEquals(updatedHolder.getSegmentId(), currentHolder.getSegmentId()); Assert.assertEquals(updatedHolder.getNumRows(), currentHolder.getNumRows()); - //numreplicas do not change here since we addSegment with the same server which was serving existingSegment before + // numreplicas do not change here since we addSegment with the same server which was serving existingSegment before Assert.assertEquals(updatedHolder.getNumReplicas(), currentHolder.getNumReplicas()); Assert.assertEquals(updatedHolder.isAvailable(), currentHolder.isAvailable()); Assert.assertEquals(updatedHolder.isPublished(), currentHolder.isPublished()); @@ -328,7 +328,7 @@ public void testNullSegmentMetadataHolder() throws IOException Map segmentMetadatas = schema.getSegmentMetadata(); Set segments = segmentMetadatas.keySet(); Assert.assertEquals(segments.size(), 3); - //remove one of the segments with datasource "foo" + // remove one of the segments with datasource "foo" final DataSegment segmentToRemove = segments.stream() .filter(segment -> segment.getDataSource().equals("foo")) .findFirst() From 11b84dd03fef21d03faf1d8becfeb8fef84d779a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sun, 10 Feb 2019 15:29:17 -0800 Subject: [PATCH 7/7] PR comments --- .../druid/sql/calcite/schema/DruidSchema.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 1777a412cb6d..4aaa95e12a63 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.amazonaws.annotation.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -94,9 +95,9 @@ public class DruidSchema extends AbstractSchema private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private static final long IS_PUBLISHED = 0; - private static final long IS_AVAILABLE = 1; - private static final long NUM_ROWS = 0; + private static final long DEFAULT_IS_PUBLISHED = 0; + private static final long DEFAULT_IS_AVAILABLE = 1; + private static final long DEFAULT_NUM_ROWS = 0; private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerConfig config; @@ -112,8 +113,7 @@ public class DruidSchema extends AbstractSchema // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment. // Use TreeMap for segments so they are merged in deterministic order, from older to newer. - // This data structure need to be accessed in a thread-safe way via lock Object since segments can be added, - // removed, refreshed or accessed asynchronously + @GuardedBy("lock") private final Map> segmentMetadataInfo = new HashMap<>(); private int totalSegments = 0; @@ -365,12 +365,12 @@ void addSegment(final DruidServerMetadata server, final DataSegment segment) final Set servers = ImmutableSet.of(server.getName()); holder = SegmentMetadataHolder.builder( segment.getId(), - IS_PUBLISHED, - IS_AVAILABLE, + DEFAULT_IS_PUBLISHED, + DEFAULT_IS_AVAILABLE, isRealtime, servers, null, - NUM_ROWS + DEFAULT_NUM_ROWS ).build(); // Unknown segment. setSegmentMetadataHolder(segment, holder);