From 1f342cfb4686f274077710a12e6c99042e4261ba Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 8 Apr 2019 12:00:13 -0700 Subject: [PATCH 01/17] Add is_overshadowed column to sys.segments table --- .../timeline/SegmentWithOvershadowInfo.java | 89 +++++++++++++++++++ docs/content/querying/sql.md | 1 + .../druid/server/http/MetadataResource.java | 89 ++++++++++++++----- .../calcite/schema/MetadataSegmentView.java | 30 ++++--- .../sql/calcite/schema/SystemSchema.java | 42 +++++---- .../sql/calcite/schema/SystemSchemaTest.java | 44 +++++---- 6 files changed, 231 insertions(+), 64 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java new file mode 100644 index 000000000000..918e97696432 --- /dev/null +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * DataSegment and overshadow info for the segment. An immutable object. + * + * SegmentWithOvershadowInfo's {@link #compareTo} method considers only the {@link SegmentId} of the DataSegment object. + */ +public class SegmentWithOvershadowInfo implements Comparable +{ + private final boolean isOvershadowed; + private final DataSegment dataSegment; + + @JsonCreator + public SegmentWithOvershadowInfo( + @JsonProperty("dataSegment") DataSegment dataSegment, + @JsonProperty("overshadowed") boolean isOvershadowed + ) + { + this.dataSegment = dataSegment; + this.isOvershadowed = isOvershadowed; + } + + @JsonProperty + public boolean isOvershadowed() + { + return isOvershadowed; + } + + @JsonProperty + public DataSegment getDataSegment() + { + return dataSegment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof SegmentWithOvershadowInfo)) { + return false; + } + final SegmentWithOvershadowInfo that = (SegmentWithOvershadowInfo) o; + if (!dataSegment.equals(that.dataSegment)) { + return false; + } + if (isOvershadowed != (that.isOvershadowed)) { + return false; + } + return true; + } + + @Override + public int hashCode() + { + int result = dataSegment.hashCode(); + result = 31 * result + Boolean.hashCode(isOvershadowed); + return result; + } + + @Override + public int compareTo(SegmentWithOvershadowInfo o) + { + return getDataSegment().getId().compareTo(dataSegment.getId()); + } +} diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index a9e781bbf51c..6af1029ab490 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -605,6 +605,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| +|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is overshadowed by some other published segment. You can query `is_published && !is_overshadowed` to find segments which should be available and compare those to `is_available` for debugging purposes |payload|STRING|JSON-serialized data segment payload| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index c7e270214ff4..f8645e3c5def 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -19,12 +19,11 @@ package org.apache.druid.server.http; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; @@ -39,6 +38,8 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowInfo; +import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -51,10 +52,12 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.StreamingOutput; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -151,7 +154,8 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final @Produces(MediaType.APPLICATION_JSON) public Response getDatabaseSegments( @Context final HttpServletRequest req, - @QueryParam("datasources") final Set datasources + @QueryParam("datasources") final Set datasources, + @QueryParam("includeOvershadowInfo") final String includeOvershadowInfo ) { Collection druidDataSources = metadataSegmentManager.getDataSources(); @@ -164,26 +168,69 @@ public Response getDatabaseSegments( .stream() .flatMap(t -> t.getSegments().stream()); - final Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + if (includeOvershadowInfo != null) { + final Set overshadowedSegments = findOvershadowedSegments(druidDataSources); + //transform DataSegment to SegmentWithOvershadowInfo objects + final Stream metadataSegmentWithOvershadowInfo = metadataSegments.map(segment -> { + if (overshadowedSegments.contains(segment.getId())) { + return new SegmentWithOvershadowInfo(segment, true); + } else { + return new SegmentWithOvershadowInfo(segment, false); + } + }).collect(Collectors.toList()).stream(); - final Iterable authorizedSegments = - AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper); + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final StreamingOutput stream = outputStream -> { - final JsonFactory jsonFactory = jsonMapper.getFactory(); - try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(outputStream)) { - jsonGenerator.writeStartArray(); - for (DataSegment ds : authorizedSegments) { - jsonGenerator.writeObject(ds); - jsonGenerator.flush(); - } - jsonGenerator.writeEndArray(); - } - }; + final Iterable authorizedSegments = + AuthorizationUtils.filterAuthorizedResources( + req, + metadataSegmentWithOvershadowInfo::iterator, + raGenerator, + authorizerMapper + ); + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + return builder.entity(authorizedSegments).build(); + } else { - Response.ResponseBuilder builder = Response.status(Response.Status.OK); - return builder.entity(stream).build(); + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + + final Iterable authorizedSegments = + AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper); + + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + return builder.entity(authorizedSegments).build(); + } + + } + + /** + * find fully overshadowed segments + * + * @param druidDataSources + * + * @return set of overshadowed segments + */ + private Set findOvershadowedSegments(Collection druidDataSources) + { + final Stream segmentStream = druidDataSources + .stream() + .flatMap(t -> t.getSegments().stream()); + final Set usedSegments = segmentStream.collect(Collectors.toSet()); + final Map> timelines = new HashMap<>(); + usedSegments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + + final Set overshadowedSegments = new HashSet<>(); + for (DataSegment dataSegment : usedSegments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment.getId()); + } + } + return overshadowedSegments; } @GET diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 50fe3133cd28..de138de3ec8c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -43,6 +43,7 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentWithOvershadowInfo; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; @@ -51,8 +52,7 @@ import java.io.InputStream; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,8 +73,10 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; + // Use ConcurrentSkipListMap so that the order of segments is deterministic and + // sys.segments queries return the segments in sorted order based on segmentId @Nullable - private final ConcurrentMap publishedSegments; + private final ConcurrentSkipListMap publishedSegments; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -96,7 +98,7 @@ public MetadataSegmentView( this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); - this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null; + this.publishedSegments = isCacheEnabled ? new ConcurrentSkipListMap<>() : null; this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); } @@ -134,7 +136,7 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, responseHandler, @@ -143,10 +145,16 @@ private void poll() final DateTime timestamp = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { - final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next()); + final SegmentWithOvershadowInfo segment = metadataSegments.next(); + final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); + final SegmentWithOvershadowInfo segmentWithOvershadowInfo = new SegmentWithOvershadowInfo( + interned, + segment.isOvershadowed() + ); // timestamp is used to filter deleted segments - publishedSegments.put(interned, timestamp); + publishedSegments.put(segmentWithOvershadowInfo, timestamp); } + // filter the segments from cache whose timestamp is not equal to latest timestamp stored, // since the presence of a segment with an earlier timestamp indicates that // "that" segment is not returned by coordinator in latest poll, so it's @@ -160,7 +168,7 @@ private void poll() cachePopulated.set(true); } - public Iterator getPublishedSegments() + public Iterator getPublishedSegments() { if (isCacheEnabled) { Preconditions.checkState( @@ -179,14 +187,14 @@ public Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources ) { - String query = "/druid/coordinator/v1/metadata/segments"; + String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowInfo"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { log.debug( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); @@ -213,7 +221,7 @@ private JsonParserIterator getMetadataSegments( responseHandler ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 6316b40f991f..f4259f4778ed 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -69,6 +69,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowInfo; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -92,6 +93,9 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; + private static final long AVAILABLE_IS_OVERSHADOWED_VALUE = 0L; + private static final long PUBLISHED_IS_PUBLISHED_VALUE = 1L; + static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() .add("segment_id", ValueType.STRING) @@ -106,6 +110,7 @@ public class SystemSchema extends AbstractSchema .add("is_published", ValueType.LONG) .add("is_available", ValueType.LONG) .add("is_realtime", ValueType.LONG) + .add("is_overshadowed", ValueType.LONG) .add("payload", ValueType.STRING) .build(); @@ -229,7 +234,7 @@ public Enumerable scan(DataContext root) } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataSegments = metadataView.getPublishedSegments(); + final Iterator metadataSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -240,8 +245,9 @@ public Enumerable scan(DataContext root) )) .transform(val -> { try { - segmentsAlreadySeen.add(val.getId()); - final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId()); + final DataSegment segment = val.getDataSegment(); + segmentsAlreadySeen.add(segment.getId()); + final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId()); long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L; if (partialSegmentData != null) { numReplicas = partialSegmentData.getNumReplicas(); @@ -250,23 +256,24 @@ public Enumerable scan(DataContext root) isRealtime = partialSegmentData.isRealtime(); } return new Object[]{ - val.getId(), - val.getDataSource(), - val.getInterval().getStart().toString(), - val.getInterval().getEnd().toString(), - val.getSize(), - val.getVersion(), - Long.valueOf(val.getShardSpec().getPartitionNum()), + segment.getId(), + segment.getDataSource(), + segment.getInterval().getStart().toString(), + segment.getInterval().getEnd().toString(), + segment.getSize(), + segment.getVersion(), + Long.valueOf(segment.getShardSpec().getPartitionNum()), numReplicas, numRows, - 1L, //is_published is true for published segments + PUBLISHED_IS_PUBLISHED_VALUE, //is_published is true for published segments isAvailable, isRealtime, + val.isOvershadowed() ? 1L : 0L, jsonMapper.writeValueAsString(val) }; } catch (JsonProcessingException e) { - throw new RE(e, "Error getting segment payload for segment %s", val.getId()); + throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId()); } }); @@ -295,6 +302,7 @@ public Enumerable scan(DataContext root) val.getValue().isPublished(), val.getValue().isAvailable(), val.getValue().isRealtime(), + AVAILABLE_IS_OVERSHADOWED_VALUE, jsonMapper.writeValueAsString(val.getKey()) }; } @@ -311,18 +319,18 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> it, raGenerator, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index d354f141d9af..861aca5cffcd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -76,6 +76,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowInfo; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -381,7 +382,7 @@ public void testGetTableMap() final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); - Assert.assertEquals(13, fields.size()); + Assert.assertEquals(14, fields.size()); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); @@ -408,11 +409,14 @@ public void testSegmentsTable() .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - final Set publishedSegments = Stream.of(publishedSegment1, - publishedSegment2, - publishedSegment3, - segment1, - segment2).collect(Collectors.toSet()); + final Set publishedSegments = Stream.of( + new SegmentWithOvershadowInfo(publishedSegment1, true), + new SegmentWithOvershadowInfo(publishedSegment2, false), + new SegmentWithOvershadowInfo(publishedSegment3, false), + new SegmentWithOvershadowInfo(segment1, true), + new SegmentWithOvershadowInfo(segment2, false) + ).collect(Collectors.toSet()); + EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); @@ -463,7 +467,8 @@ public Object get(String name) 3L, //numRows 1L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 1L //is_overshadowed ); verifyRow( @@ -475,7 +480,8 @@ public Object get(String name) 3L, //numRows 1L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 @@ -488,7 +494,8 @@ public Object get(String name) 2L, //numRows 0L, //is_published 1L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -500,7 +507,8 @@ public Object get(String name) 0L, //numRows 0L, //is_published 1L, //is_available - 1L //is_realtime + 1L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -512,7 +520,8 @@ public Object get(String name) 0L, //numRows 0L, //is_published 1L, //is_available - 1L //is_realtime + 1L, //is_realtime + 0L //is_overshadowed ); // wikipedia segments are published and unavailable, num_replicas is 0 @@ -525,7 +534,8 @@ public Object get(String name) 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 1L //is_overshadowed ); verifyRow( @@ -537,7 +547,8 @@ public Object get(String name) 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); verifyRow( @@ -549,7 +560,8 @@ public Object get(String name) 0L, //numRows 1L, //is_published 0L, //is_available - 0L //is_realtime + 0L, //is_realtime + 0L //is_overshadowed ); // Verify value types. @@ -565,7 +577,8 @@ private void verifyRow( long numRows, long isPublished, long isAvailable, - long isRealtime) + long isRealtime, + long isOvershadowed) { Assert.assertEquals(segmentId, row[0].toString()); SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0); @@ -580,6 +593,7 @@ private void verifyRow( Assert.assertEquals(isPublished, row[9]); Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isRealtime, row[11]); + Assert.assertEquals(isOvershadowed, row[12]); } @Test From 635921de1f0c4864699dab025301635553dec8ab Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 8 Apr 2019 14:33:42 -0700 Subject: [PATCH 02/17] update docs --- docs/content/querying/sql.md | 2 +- .../java/org/apache/druid/server/http/MetadataResource.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 6af1029ab490..5102a062b228 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -605,7 +605,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| -|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is overshadowed by some other published segment. You can query `is_published && !is_overshadowed` to find segments which should be available and compare those to `is_available` for debugging purposes +|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. |payload|STRING|JSON-serialized data segment payload| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index f8645e3c5def..e5e675f6cf4b 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -202,7 +202,6 @@ public Response getDatabaseSegments( Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); } - } /** From f59e8cc426a6412c03cc0cbae944128d67fed4c4 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 10 Apr 2019 10:55:11 -0700 Subject: [PATCH 03/17] Rename class and variables --- ...ava => SegmentWithOvershadowedStatus.java} | 14 ++++++------ .../druid/server/http/MetadataResource.java | 20 ++++++++--------- .../calcite/schema/MetadataSegmentView.java | 22 +++++++++---------- .../sql/calcite/schema/SystemSchema.java | 12 +++++----- .../sql/calcite/schema/SystemSchemaTest.java | 14 ++++++------ 5 files changed, 41 insertions(+), 41 deletions(-) rename core/src/main/java/org/apache/druid/timeline/{SegmentWithOvershadowInfo.java => SegmentWithOvershadowedStatus.java} (78%) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java similarity index 78% rename from core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java rename to core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index 918e97696432..11aaae0e22c4 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowInfo.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -23,17 +23,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** - * DataSegment and overshadow info for the segment. An immutable object. + * DataSegment object plus the overshadowed status for the segment. An immutable object. * - * SegmentWithOvershadowInfo's {@link #compareTo} method considers only the {@link SegmentId} of the DataSegment object. + * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} of the DataSegment object. */ -public class SegmentWithOvershadowInfo implements Comparable +public class SegmentWithOvershadowedStatus implements Comparable { private final boolean isOvershadowed; private final DataSegment dataSegment; @JsonCreator - public SegmentWithOvershadowInfo( + public SegmentWithOvershadowedStatus( @JsonProperty("dataSegment") DataSegment dataSegment, @JsonProperty("overshadowed") boolean isOvershadowed ) @@ -60,10 +60,10 @@ public boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof SegmentWithOvershadowInfo)) { + if (!(o instanceof SegmentWithOvershadowedStatus)) { return false; } - final SegmentWithOvershadowInfo that = (SegmentWithOvershadowInfo) o; + final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o; if (!dataSegment.equals(that.dataSegment)) { return false; } @@ -82,7 +82,7 @@ public int hashCode() } @Override - public int compareTo(SegmentWithOvershadowInfo o) + public int compareTo(SegmentWithOvershadowedStatus o) { return getDataSegment().getId().compareTo(dataSegment.getId()); } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index e5e675f6cf4b..4c5be73a10b6 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -38,7 +38,7 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowInfo; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; @@ -155,7 +155,7 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final public Response getDatabaseSegments( @Context final HttpServletRequest req, @QueryParam("datasources") final Set datasources, - @QueryParam("includeOvershadowInfo") final String includeOvershadowInfo + @QueryParam("includeOvershadowedStatus") final String includeOvershadowedStatus ) { Collection druidDataSources = metadataSegmentManager.getDataSources(); @@ -168,24 +168,24 @@ public Response getDatabaseSegments( .stream() .flatMap(t -> t.getSegments().stream()); - if (includeOvershadowInfo != null) { + if (includeOvershadowedStatus != null) { final Set overshadowedSegments = findOvershadowedSegments(druidDataSources); - //transform DataSegment to SegmentWithOvershadowInfo objects - final Stream metadataSegmentWithOvershadowInfo = metadataSegments.map(segment -> { + //transform DataSegment to SegmentWithOvershadowedStatus objects + final Stream segmentsWithOvershadowedStatus = metadataSegments.map(segment -> { if (overshadowedSegments.contains(segment.getId())) { - return new SegmentWithOvershadowInfo(segment, true); + return new SegmentWithOvershadowedStatus(segment, true); } else { - return new SegmentWithOvershadowInfo(segment, false); + return new SegmentWithOvershadowedStatus(segment, false); } }).collect(Collectors.toList()).stream(); - final Function> raGenerator = segment -> Collections.singletonList( + final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( req, - metadataSegmentWithOvershadowInfo::iterator, + segmentsWithOvershadowedStatus::iterator, raGenerator, authorizerMapper ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index de138de3ec8c..fd9a9039f0f4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -43,7 +43,7 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentWithOvershadowInfo; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; @@ -76,7 +76,7 @@ public class MetadataSegmentView // Use ConcurrentSkipListMap so that the order of segments is deterministic and // sys.segments queries return the segments in sorted order based on segmentId @Nullable - private final ConcurrentSkipListMap publishedSegments; + private final ConcurrentSkipListMap publishedSegments; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -136,7 +136,7 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, responseHandler, @@ -145,14 +145,14 @@ private void poll() final DateTime timestamp = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { - final SegmentWithOvershadowInfo segment = metadataSegments.next(); + final SegmentWithOvershadowedStatus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final SegmentWithOvershadowInfo segmentWithOvershadowInfo = new SegmentWithOvershadowInfo( + final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( interned, segment.isOvershadowed() ); // timestamp is used to filter deleted segments - publishedSegments.put(segmentWithOvershadowInfo, timestamp); + publishedSegments.put(segmentWithOvershadowedStatus, timestamp); } // filter the segments from cache whose timestamp is not equal to latest timestamp stored, @@ -168,7 +168,7 @@ private void poll() cachePopulated.set(true); } - public Iterator getPublishedSegments() + public Iterator getPublishedSegments() { if (isCacheEnabled) { Preconditions.checkState( @@ -187,14 +187,14 @@ public Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources ) { - String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowInfo"; + String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { log.debug( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); @@ -203,7 +203,7 @@ private JsonParserIterator getMetadataSegments( sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); - query = "/druid/coordinator/v1/metadata/segments?" + sb; + query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus?" + sb; } Request request; try { @@ -221,7 +221,7 @@ private JsonParserIterator getMetadataSegments( responseHandler ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index f4259f4778ed..00e609acbc05 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -69,7 +69,7 @@ import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowInfo; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -234,7 +234,7 @@ public Enumerable scan(DataContext root) } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataSegments = metadataView.getPublishedSegments(); + final Iterator metadataSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -319,18 +319,18 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - Function> raGenerator = segment -> Collections.singletonList( + Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> it, raGenerator, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 861aca5cffcd..a942db4b556c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -76,7 +76,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowInfo; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -409,12 +409,12 @@ public void testSegmentsTable() .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - final Set publishedSegments = Stream.of( - new SegmentWithOvershadowInfo(publishedSegment1, true), - new SegmentWithOvershadowInfo(publishedSegment2, false), - new SegmentWithOvershadowInfo(publishedSegment3, false), - new SegmentWithOvershadowInfo(segment1, true), - new SegmentWithOvershadowInfo(segment2, false) + final Set publishedSegments = Stream.of( + new SegmentWithOvershadowedStatus(publishedSegment1, true), + new SegmentWithOvershadowedStatus(publishedSegment2, false), + new SegmentWithOvershadowedStatus(publishedSegment3, false), + new SegmentWithOvershadowedStatus(segment1, true), + new SegmentWithOvershadowedStatus(segment2, false) ).collect(Collectors.toSet()); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); From f899c0dcabb1055981263c10845101868faa7b0b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 11 Apr 2019 14:01:59 -0700 Subject: [PATCH 04/17] PR comments --- .../timeline/SegmentWithOvershadowedStatus.java | 12 ++++++------ docs/content/querying/sql.md | 2 +- .../apache/druid/server/http/MetadataResource.java | 13 ++++++------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index 11aaae0e22c4..dd103776a76b 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -29,23 +29,23 @@ */ public class SegmentWithOvershadowedStatus implements Comparable { - private final boolean isOvershadowed; + private final boolean overshadowed; private final DataSegment dataSegment; @JsonCreator public SegmentWithOvershadowedStatus( @JsonProperty("dataSegment") DataSegment dataSegment, - @JsonProperty("overshadowed") boolean isOvershadowed + @JsonProperty("overshadowed") boolean overshadowed ) { this.dataSegment = dataSegment; - this.isOvershadowed = isOvershadowed; + this.overshadowed = overshadowed; } @JsonProperty public boolean isOvershadowed() { - return isOvershadowed; + return overshadowed; } @JsonProperty @@ -67,7 +67,7 @@ public boolean equals(Object o) if (!dataSegment.equals(that.dataSegment)) { return false; } - if (isOvershadowed != (that.isOvershadowed)) { + if (overshadowed != (that.overshadowed)) { return false; } return true; @@ -77,7 +77,7 @@ public boolean equals(Object o) public int hashCode() { int result = dataSegment.hashCode(); - result = 31 * result + Boolean.hashCode(isOvershadowed); + result = 31 * result + Boolean.hashCode(overshadowed); return result; } diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index fbe3ae691b1e..bd8513658d88 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -609,7 +609,7 @@ Note that a segment can be served by more than one stream ingestion tasks or His |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 represents this segment has been published to the metadata store with `used=1`| |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime)| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is being served on any type of realtime tasks| -|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. +|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. |payload|STRING|JSON-serialized data segment payload| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 4c5be73a10b6..4328409f35ac 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -171,13 +171,12 @@ public Response getDatabaseSegments( if (includeOvershadowedStatus != null) { final Set overshadowedSegments = findOvershadowedSegments(druidDataSources); //transform DataSegment to SegmentWithOvershadowedStatus objects - final Stream segmentsWithOvershadowedStatus = metadataSegments.map(segment -> { - if (overshadowedSegments.contains(segment.getId())) { - return new SegmentWithOvershadowedStatus(segment, true); - } else { - return new SegmentWithOvershadowedStatus(segment, false); - } - }).collect(Collectors.toList()).stream(); + final Stream segmentsWithOvershadowedStatus = metadataSegments + .map( + segment -> new SegmentWithOvershadowedStatus( + segment, + overshadowedSegments.contains(segment.getId()) + )).collect(Collectors.toList()).stream(); final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); From 4afb4d59fb7198663976500ca87243c8d0376f1a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 12 Apr 2019 16:34:52 -0700 Subject: [PATCH 05/17] PR comments --- .../timeline/VersionedIntervalTimeline.java | 9 ++++ .../helper/DruidCoordinatorRuleRunner.java | 11 +--- .../druid/server/http/MetadataResource.java | 53 +++++++++---------- .../calcite/schema/MetadataSegmentView.java | 5 +- .../sql/calcite/schema/SystemSchema.java | 11 ++-- 5 files changed, 45 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index 36c177dfb0f0..bbc24afdc10b 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -109,6 +109,15 @@ public static void addSegments( ); } + public static Map> buildTimelines(Iterable segments) + { + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + return timelines; + } + @VisibleForTesting public Map> getAllTimelineEntries() { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 7d7830170c99..d6eff5f3b0ef 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -36,7 +35,6 @@ import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.DateTime; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -141,13 +139,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) private Set determineOvershadowedSegments(DruidCoordinatorRuntimeParams params) { - Map> timelines = new HashMap<>(); - for (DataSegment segment : params.getAvailableSegments()) { - timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - } - + final Map> timelines = VersionedIntervalTimeline.buildTimelines( + params.getAvailableSegments()); Set overshadowed = new HashSet<>(); for (VersionedIntervalTimeline timeline : timelines.values()) { for (TimelineObjectHolder holder : timeline.findOvershadowed()) { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 2bb7783e5a7d..4c8fff36f821 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; @@ -54,7 +53,6 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -177,24 +175,20 @@ public Response getDatabaseSegments( if (includeOvershadowedStatus != null) { final Set overshadowedSegments = findOvershadowedSegments(druidDataSources); - //transform DataSegment to SegmentWithOvershadowedStatus objects - final Stream segmentsWithOvershadowedStatus = metadataSegments - .map( - segment -> new SegmentWithOvershadowedStatus( - segment, - overshadowedSegments.contains(segment.getId()) - )).collect(Collectors.toList()).stream(); + final Stream segmentsWithOvershadowedStatus = metadataSegments.map(segment -> new SegmentWithOvershadowedStatus( + segment, + overshadowedSegments.contains(segment.getId()) + )); final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = - AuthorizationUtils.filterAuthorizedResources( - req, - segmentsWithOvershadowedStatus::iterator, - raGenerator, - authorizerMapper - ); + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + req, + segmentsWithOvershadowedStatus::iterator, + raGenerator, + authorizerMapper + ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); } else { @@ -202,8 +196,12 @@ public Response getDatabaseSegments( final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); - final Iterable authorizedSegments = - AuthorizationUtils.filterAuthorizedResources(req, metadataSegments::iterator, raGenerator, authorizerMapper); + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + req, + metadataSegments::iterator, + raGenerator, + authorizerMapper + ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); @@ -211,7 +209,7 @@ public Response getDatabaseSegments( } /** - * find fully overshadowed segments + * This method finds the fully overshadowed segments from the given druidDataSources * * @param druidDataSources * @@ -222,17 +220,16 @@ private Set findOvershadowedSegments(Collection segmentStream = druidDataSources .stream() .flatMap(t -> t.getSegments().stream()); - final Set usedSegments = segmentStream.collect(Collectors.toSet()); - final Map> timelines = new HashMap<>(); - usedSegments.forEach(segment -> timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + final Map> timelines = VersionedIntervalTimeline.buildTimelines( + () -> segmentStream.iterator()); final Set overshadowedSegments = new HashSet<>(); - for (DataSegment dataSegment : usedSegments) { - final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment.getId()); + for (ImmutableDruidDataSource dataSource : druidDataSources) { + for (DataSegment dataSegment : dataSource.getSegments()) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment.getId()); + } } } return overshadowedSegments; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index fd9a9039f0f4..30953476be2f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -73,8 +73,9 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; - // Use ConcurrentSkipListMap so that the order of segments is deterministic and - // sys.segments queries return the segments in sorted order based on segmentId + /** + * Use {@link ConcurrentSkipListMap} so that the order of segments is deterministic and sys.segments queries return the segments in sorted order based on segmentId + */ @Nullable private final ConcurrentSkipListMap publishedSegments; private final ScheduledExecutorService scheduledExec; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 00e609acbc05..fa7a652817dd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -93,8 +93,9 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; - private static final long AVAILABLE_IS_OVERSHADOWED_VALUE = 0L; - private static final long PUBLISHED_IS_PUBLISHED_VALUE = 1L; + private static final long IS_OVERSHADOWED_FALSE = 0L; + private static final long IS_OVERSHADOWED_TRUE = 1L; + private static final long IS_PUBLISHED_TRUE = 1L; static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() @@ -265,10 +266,10 @@ public Enumerable scan(DataContext root) Long.valueOf(segment.getShardSpec().getPartitionNum()), numReplicas, numRows, - PUBLISHED_IS_PUBLISHED_VALUE, //is_published is true for published segments + IS_PUBLISHED_TRUE, //is_published is true for published segments isAvailable, isRealtime, - val.isOvershadowed() ? 1L : 0L, + val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, jsonMapper.writeValueAsString(val) }; } @@ -302,7 +303,7 @@ public Enumerable scan(DataContext root) val.getValue().isPublished(), val.getValue().isAvailable(), val.getValue().isRealtime(), - AVAILABLE_IS_OVERSHADOWED_VALUE, + IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed jsonMapper.writeValueAsString(val.getKey()) }; } From 75f69f1db9dffcf523eb8045586def3863290003 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 13 Apr 2019 17:56:47 -0700 Subject: [PATCH 06/17] remove unused variables in MetadataResource --- .../java/org/apache/druid/server/http/MetadataResource.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 4c8fff36f821..256bf0a645e4 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -69,9 +69,7 @@ public class MetadataResource { private final MetadataSegmentManager metadataSegmentManager; private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; - private final AuthConfig authConfig; private final AuthorizerMapper authorizerMapper; - private final ObjectMapper jsonMapper; @Inject public MetadataResource( @@ -84,9 +82,7 @@ public MetadataResource( { this.metadataSegmentManager = metadataSegmentManager; this.metadataStorageCoordinator = metadataStorageCoordinator; - this.authConfig = authConfig; this.authorizerMapper = authorizerMapper; - this.jsonMapper = jsonMapper; } @GET From 9022399e47599fe38097a1c947e325cfbf3cf196 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 15 Apr 2019 09:32:24 -0700 Subject: [PATCH 07/17] move constants together --- .../sql/calcite/schema/SystemSchema.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 27e6bcd501f0..18f4c31b5958 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -93,9 +93,16 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; + /** + * Booleans constants represented as long type, + * where 1 = true and 0 = false to make it easy to count number of segments + * which are published, available etc. + */ + private static final long IS_PUBLISHED_FALSE = 0L; + private static final long IS_PUBLISHED_TRUE = 1L; + private static final long IS_AVAILABLE_TRUE = 1L; private static final long IS_OVERSHADOWED_FALSE = 0L; private static final long IS_OVERSHADOWED_TRUE = 1L; - private static final long IS_PUBLISHED_TRUE = 1L; static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() @@ -195,14 +202,6 @@ static class SegmentsTable extends AbstractTable implements ScannableTable private final AuthorizerMapper authorizerMapper; private final MetadataSegmentView metadataView; - /** - * Booleans constants used for available segments represented as long type, - * where 1 = true and 0 = false to make it easy to count number of segments - * which are published, available - */ - private static final long DEFAULT_IS_PUBLISHED = 0; - private static final long DEFAULT_IS_AVAILABLE = 1; - public SegmentsTable( DruidSchema druidSchemna, MetadataSegmentView metadataView, @@ -241,7 +240,7 @@ public Enumerable scan(DataContext root) Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments()); for (AvailableSegmentMetadata h : availableSegmentMetadata.values()) { PartialSegmentData partialSegmentData = - new PartialSegmentData(DEFAULT_IS_AVAILABLE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); + new PartialSegmentData(IS_AVAILABLE_TRUE, h.isRealtime(), h.getNumReplicas(), h.getNumRows()); partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } @@ -311,8 +310,8 @@ public Enumerable scan(DataContext root) Long.valueOf(val.getKey().getShardSpec().getPartitionNum()), numReplicas, val.getValue().getNumRows(), - DEFAULT_IS_PUBLISHED, - DEFAULT_IS_AVAILABLE, + IS_PUBLISHED_FALSE, // is_published is false for unpublished segments + IS_AVAILABLE_TRUE, // is_available is assumed to be always true for segments announced by historicals or realtime tasks val.getValue().isRealtime(), IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed jsonMapper.writeValueAsString(val.getKey()) From fd5c8b73c81b698e9ffc55d1811d650e7ea15963 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 16 Apr 2019 12:42:21 -0700 Subject: [PATCH 08/17] add getFullyOvershadowedSegments method to ImmutableDruidDataSource --- .../client/ImmutableDruidDataSource.java | 24 +++++++++++++ .../druid/server/http/MetadataResource.java | 34 +++---------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 59539443b618..44ed56da1078 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -27,10 +27,13 @@ import com.google.common.collect.ImmutableSortedMap; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source. @@ -109,6 +112,27 @@ public long getTotalSizeOfSegments() return totalSizeOfSegments; } + /** + * This method finds the fully overshadowed segments in this datasource + * + * @return set of overshadowed segments + */ + public Set getFullyOvershadowedSegments() + { + final Collection segments = this.getSegments(); + final Map> timelines = VersionedIntervalTimeline.buildTimelines( + segments); + + final Set overshadowedSegments = new HashSet<>(); + for (DataSegment dataSegment : segments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment.getId()); + } + } + return overshadowedSegments; + } + @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index e1f101337375..9c09f197c40b 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -38,7 +38,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -55,7 +54,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; @@ -165,7 +163,10 @@ public Response getDatabaseSegments( final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); if (includeOvershadowedStatus != null) { - final Set overshadowedSegments = findOvershadowedSegments(druidDataSources); + final Set overshadowedSegments = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : druidDataSources) { + overshadowedSegments.addAll(dataSource.getFullyOvershadowedSegments()); + } final Stream segmentsWithOvershadowedStatus = metadataSegments.map(segment -> new SegmentWithOvershadowedStatus( segment, overshadowedSegments.contains(segment.getId()) @@ -199,33 +200,6 @@ public Response getDatabaseSegments( } } - /** - * This method finds the fully overshadowed segments from the given druidDataSources - * - * @param druidDataSources - * - * @return set of overshadowed segments - */ - private Set findOvershadowedSegments(Collection druidDataSources) - { - final Stream segmentStream = druidDataSources - .stream() - .flatMap(t -> t.getSegments().stream()); - final Map> timelines = VersionedIntervalTimeline.buildTimelines( - () -> segmentStream.iterator()); - - final Set overshadowedSegments = new HashSet<>(); - for (ImmutableDruidDataSource dataSource : druidDataSources) { - for (DataSegment dataSegment : dataSource.getSegments()) { - final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment.getId()); - } - } - } - return overshadowedSegments; - } - @GET @Path("/datasources/{dataSourceName}/segments") @Produces(MediaType.APPLICATION_JSON) From 95ad416512e39d9eeea14f6c284b8caa20b04ca2 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 18 Apr 2019 13:19:55 -0700 Subject: [PATCH 09/17] Fix compareTo of SegmentWithOvershadowedStatus --- .../apache/druid/timeline/SegmentWithOvershadowedStatus.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index dd103776a76b..0ca65faf7912 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -84,6 +84,6 @@ public int hashCode() @Override public int compareTo(SegmentWithOvershadowedStatus o) { - return getDataSegment().getId().compareTo(dataSegment.getId()); + return dataSegment.getId().compareTo(o.dataSegment.getId()); } } From cd7f468448946a68b36347f7f580cb0c2d31a74f Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 19 Apr 2019 14:31:38 -0700 Subject: [PATCH 10/17] PR comment --- .../apache/druid/sql/calcite/schema/MetadataSegmentView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 30953476be2f..e5cbfee76377 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -204,7 +204,7 @@ private JsonParserIterator getMetadataSegments( sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); - query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus?" + sb; + query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } Request request; try { From 48f95c051aa32c2ca5c02c9ed8390831e467638a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 20 Apr 2019 17:19:53 -0700 Subject: [PATCH 11/17] PR comments --- .../calcite/schema/MetadataSegmentView.java | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index e5cbfee76377..37f8ce70ac09 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; @@ -32,7 +33,6 @@ import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -45,7 +45,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.IOException; @@ -75,9 +74,10 @@ public class MetadataSegmentView private final boolean isCacheEnabled; /** * Use {@link ConcurrentSkipListMap} so that the order of segments is deterministic and sys.segments queries return the segments in sorted order based on segmentId + * {@link CachedSegmentOvershadowedStatus} contains the overshadow status and the timestamp for the DataSegment */ @Nullable - private final ConcurrentSkipListMap publishedSegments; + private final ConcurrentSkipListMap publishedSegments; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -144,16 +144,12 @@ private void poll() segmentWatcherConfig.getWatchedDataSources() ); - final DateTime timestamp = DateTimes.nowUtc(); + final long timestamp = System.currentTimeMillis(); while (metadataSegments.hasNext()) { final SegmentWithOvershadowedStatus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( - interned, - segment.isOvershadowed() - ); // timestamp is used to filter deleted segments - publishedSegments.put(segmentWithOvershadowedStatus, timestamp); + publishedSegments.put(interned, new CachedSegmentOvershadowedStatus(segment.isOvershadowed(), timestamp)); } // filter the segments from cache whose timestamp is not equal to latest timestamp stored, @@ -165,7 +161,7 @@ private void poll() // we are incrementally removing deleted segments from the map // This means publishedSegments will be eventually consistent with // the segments in coordinator - publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); + publishedSegments.entrySet().removeIf(e -> e.getValue().getTimestamp() != timestamp); cachePopulated.set(true); } @@ -176,7 +172,13 @@ public Iterator getPublishedSegments() lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), "hold on, still syncing published segments" ); - return publishedSegments.keySet().iterator(); + return Iterators.transform( + publishedSegments.entrySet().iterator(), + input -> new SegmentWithOvershadowedStatus( + input.getKey(), + input.getValue().isOvershadowed() + ) + ); } else { return getMetadataSegments( coordinatorDruidLeaderClient, @@ -260,5 +262,27 @@ public void run() } } } + + private static class CachedSegmentOvershadowedStatus + { + final boolean overshadowed; + final long timestamp; + + public CachedSegmentOvershadowedStatus(boolean overshadowed, long timestamp) + { + this.overshadowed = overshadowed; + this.timestamp = timestamp; + } + + public boolean isOvershadowed() + { + return overshadowed; + } + + public long getTimestamp() + { + return timestamp; + } + } } From 0c6d27f21b35a4bda9458e2a413a4f015bb16afb Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 22 Apr 2019 17:37:32 -0700 Subject: [PATCH 12/17] PR comments --- .../client/ImmutableDruidDataSource.java | 10 ++-- .../helper/DruidCoordinatorRuleRunner.java | 4 +- .../druid/server/http/MetadataResource.java | 47 ++++++++++++------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 44ed56da1078..82def3de43d2 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -113,15 +113,15 @@ public long getTotalSizeOfSegments() } /** - * This method finds the fully overshadowed segments in this datasource + * This method finds the overshadowed segments in this datasource * - * @return set of overshadowed segments + * @return set of overshadowed segment ids */ - public Set getFullyOvershadowedSegments() + public Set determineOvershadowedSegments() { final Collection segments = this.getSegments(); - final Map> timelines = VersionedIntervalTimeline.buildTimelines( - segments); + final Map> timelines = VersionedIntervalTimeline + .buildTimelines(segments); final Set overshadowedSegments = new HashSet<>(); for (DataSegment dataSegment : segments) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index d6eff5f3b0ef..53f02bbe12cf 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -139,8 +139,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) private Set determineOvershadowedSegments(DruidCoordinatorRuntimeParams params) { - final Map> timelines = VersionedIntervalTimeline.buildTimelines( - params.getAvailableSegments()); + final Map> timelines = VersionedIntervalTimeline + .buildTimelines(params.getAvailableSegments()); Set overshadowed = new HashSet<>(); for (VersionedIntervalTimeline timeline : timelines.values()) { for (TimelineObjectHolder holder : timeline.findOvershadowed()) { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 9c09f197c40b..8e9604d97869 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -163,23 +163,10 @@ public Response getDatabaseSegments( final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); if (includeOvershadowedStatus != null) { - final Set overshadowedSegments = new HashSet<>(); - for (ImmutableDruidDataSource dataSource : druidDataSources) { - overshadowedSegments.addAll(dataSource.getFullyOvershadowedSegments()); - } - final Stream segmentsWithOvershadowedStatus = metadataSegments.map(segment -> new SegmentWithOvershadowedStatus( - segment, - overshadowedSegments.contains(segment.getId()) - )); - - final Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( req, - segmentsWithOvershadowedStatus::iterator, - raGenerator, - authorizerMapper + druidDataSources, + metadataSegments ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); @@ -200,6 +187,34 @@ public Response getDatabaseSegments( } } + private Iterable findAuthorizedSegmentWithOvershadowedStatus( + HttpServletRequest req, + Collection druidDataSources, + Stream metadataSegments + ) + { + final Set overshadowedSegments = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : druidDataSources) { + overshadowedSegments.addAll(dataSource.determineOvershadowedSegments()); + } + final Stream segmentsWithOvershadowedStatus = metadataSegments + .map(segment -> new SegmentWithOvershadowedStatus( + segment, + overshadowedSegments.contains(segment.getId()) + )); + + final Function> raGenerator = segment -> Collections + .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); + + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + req, + segmentsWithOvershadowedStatus::iterator, + raGenerator, + authorizerMapper + ); + return authorizedSegments; + } + @GET @Path("/datasources/{dataSourceName}/segments") @Produces(MediaType.APPLICATION_JSON) From 3ad8e34000a0fd6a41b260e6999160d82421f6cf Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 24 Apr 2019 11:55:26 -0700 Subject: [PATCH 13/17] PR comments --- .../SegmentWithOvershadowedStatus.java | 3 ++- .../client/ImmutableDruidDataSource.java | 7 +++---- .../helper/DruidCoordinatorRuleRunner.java | 19 +++---------------- .../druid/server/http/MetadataResource.java | 15 +++++++-------- .../calcite/schema/MetadataSegmentView.java | 3 ++- 5 files changed, 17 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index 0ca65faf7912..e86daea7d86e 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -25,7 +25,8 @@ /** * DataSegment object plus the overshadowed status for the segment. An immutable object. * - * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} of the DataSegment object. + * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} + * of the DataSegment object. */ public class SegmentWithOvershadowedStatus implements Comparable { diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 82def3de43d2..a41a87c9eaf7 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -117,17 +117,16 @@ public long getTotalSizeOfSegments() * * @return set of overshadowed segment ids */ - public Set determineOvershadowedSegments() + public static Set determineOvershadowedSegments(Iterable segments) { - final Collection segments = this.getSegments(); final Map> timelines = VersionedIntervalTimeline .buildTimelines(segments); - final Set overshadowedSegments = new HashSet<>(); + final Set overshadowedSegments = new HashSet<>(); for (DataSegment dataSegment : segments) { final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment.getId()); + overshadowedSegments.add(dataSegment); } } return overshadowedSegments; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 53f02bbe12cf..859dedb1ece9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.collect.Lists; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -87,7 +88,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // find available segments which are not overshadowed by other segments in DB // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed - Set overshadowed = determineOvershadowedSegments(params); + final Set overshadowed = ImmutableDruidDataSource + .determineOvershadowedSegments(params.getAvailableSegments()); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); @@ -136,19 +138,4 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) return params.buildFromExisting().withCoordinatorStats(stats).build(); } - - private Set determineOvershadowedSegments(DruidCoordinatorRuntimeParams params) - { - final Map> timelines = VersionedIntervalTimeline - .buildTimelines(params.getAvailableSegments()); - Set overshadowed = new HashSet<>(); - for (VersionedIntervalTimeline timeline : timelines.values()) { - for (TimelineObjectHolder holder : timeline.findOvershadowed()) { - for (DataSegment dataSegment : holder.getObject().payloads()) { - overshadowed.add(dataSegment); - } - } - } - return overshadowed; - } } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 8e9604d97869..3795f08cb465 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -52,7 +52,6 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -165,7 +164,6 @@ public Response getDatabaseSegments( if (includeOvershadowedStatus != null) { final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( req, - druidDataSources, metadataSegments ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); @@ -189,18 +187,19 @@ public Response getDatabaseSegments( private Iterable findAuthorizedSegmentWithOvershadowedStatus( HttpServletRequest req, - Collection druidDataSources, Stream metadataSegments ) { - final Set overshadowedSegments = new HashSet<>(); - for (ImmutableDruidDataSource dataSource : druidDataSources) { - overshadowedSegments.addAll(dataSource.determineOvershadowedSegments()); - } + // It's fine to add all overshadowed segments to a single collection because only + // a small fraction of the segments in the cluster are expected to be overshadowed, + // so building this collection shouldn't generate a lot of garbage. + final Set overshadowedSegments = ImmutableDruidDataSource + .determineOvershadowedSegments(() -> metadataSegments.iterator()); + final Stream segmentsWithOvershadowedStatus = metadataSegments .map(segment -> new SegmentWithOvershadowedStatus( segment, - overshadowedSegments.contains(segment.getId()) + overshadowedSegments.contains(segment) )); final Function> raGenerator = segment -> Collections diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 37f8ce70ac09..2c0abd8f40f2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -73,7 +73,8 @@ public class MetadataSegmentView private final boolean isCacheEnabled; /** - * Use {@link ConcurrentSkipListMap} so that the order of segments is deterministic and sys.segments queries return the segments in sorted order based on segmentId + * Use {@link ConcurrentSkipListMap} so that the order of segments is deterministic and + * sys.segments queries return the segments in sorted order based on segmentId * {@link CachedSegmentOvershadowedStatus} contains the overshadow status and the timestamp for the DataSegment */ @Nullable From d6bc303227dc29e9b46202a0922a22d104ce8114 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 25 Apr 2019 09:21:09 -0700 Subject: [PATCH 14/17] PR comments --- .../helper/DruidCoordinatorRuleRunner.java | 4 -- .../calcite/schema/MetadataSegmentView.java | 69 ++++++------------- 2 files changed, 20 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 859dedb1ece9..bbceaafed273 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -32,13 +32,9 @@ import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.DateTime; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; /** diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 2c0abd8f40f2..f63ab897360c 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; @@ -47,11 +47,11 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,13 +72,14 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; + private final Object lock = new Object(); /** - * Use {@link ConcurrentSkipListMap} so that the order of segments is deterministic and + * Use {@link ImmutableSortedSet} so that the order of segments is deterministic and * sys.segments queries return the segments in sorted order based on segmentId - * {@link CachedSegmentOvershadowedStatus} contains the overshadow status and the timestamp for the DataSegment */ @Nullable - private final ConcurrentSkipListMap publishedSegments; + @GuardedBy("lock") + private ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -100,7 +101,6 @@ public MetadataSegmentView( this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); - this.publishedSegments = isCacheEnabled ? new ConcurrentSkipListMap<>() : null; this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); } @@ -145,24 +145,21 @@ private void poll() segmentWatcherConfig.getWatchedDataSources() ); - final long timestamp = System.currentTimeMillis(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { final SegmentWithOvershadowedStatus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - // timestamp is used to filter deleted segments - publishedSegments.put(interned, new CachedSegmentOvershadowedStatus(segment.isOvershadowed(), timestamp)); + final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( + interned, + segment.isOvershadowed() + ); + builder.add(segmentWithOvershadowedStatus); + } + // build operation can be expensive for high cardinality segments, so calling it outside "lock" + final ImmutableSortedSet immutableSortedSet = builder.build(); + synchronized (lock) { + publishedSegments = immutableSortedSet; } - - // filter the segments from cache whose timestamp is not equal to latest timestamp stored, - // since the presence of a segment with an earlier timestamp indicates that - // "that" segment is not returned by coordinator in latest poll, so it's - // likely deleted and therefore we remove it from publishedSegments - // Since segments are not atomically replaced because it can cause high - // memory footprint due to large number of published segments, so - // we are incrementally removing deleted segments from the map - // This means publishedSegments will be eventually consistent with - // the segments in coordinator - publishedSegments.entrySet().removeIf(e -> e.getValue().getTimestamp() != timestamp); cachePopulated.set(true); } @@ -173,13 +170,9 @@ public Iterator getPublishedSegments() lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), "hold on, still syncing published segments" ); - return Iterators.transform( - publishedSegments.entrySet().iterator(), - input -> new SegmentWithOvershadowedStatus( - input.getKey(), - input.getValue().isOvershadowed() - ) - ); + synchronized (lock) { + return publishedSegments.iterator(); + } } else { return getMetadataSegments( coordinatorDruidLeaderClient, @@ -263,27 +256,5 @@ public void run() } } } - - private static class CachedSegmentOvershadowedStatus - { - final boolean overshadowed; - final long timestamp; - - public CachedSegmentOvershadowedStatus(boolean overshadowed, long timestamp) - { - this.overshadowed = overshadowed; - this.timestamp = timestamp; - } - - public boolean isOvershadowed() - { - return overshadowed; - } - - public long getTimestamp() - { - return timestamp; - } - } } From 2c692bb9814f95573295718d9bd483f286e70d73 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 25 Apr 2019 11:37:14 -0700 Subject: [PATCH 15/17] fix issue with already consumed stream --- .../org/apache/druid/server/http/MetadataResource.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 3795f08cb465..3c7e8ac37b83 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -52,6 +52,7 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -164,6 +165,7 @@ public Response getDatabaseSegments( if (includeOvershadowedStatus != null) { final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( req, + druidDataSources, metadataSegments ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); @@ -187,14 +189,17 @@ public Response getDatabaseSegments( private Iterable findAuthorizedSegmentWithOvershadowedStatus( HttpServletRequest req, + Collection druidDataSources, Stream metadataSegments ) { // It's fine to add all overshadowed segments to a single collection because only // a small fraction of the segments in the cluster are expected to be overshadowed, // so building this collection shouldn't generate a lot of garbage. - final Set overshadowedSegments = ImmutableDruidDataSource - .determineOvershadowedSegments(() -> metadataSegments.iterator()); + final Set overshadowedSegments = new HashSet<>(); + for (ImmutableDruidDataSource dataSource : druidDataSources) { + overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments())); + } final Stream segmentsWithOvershadowedStatus = metadataSegments .map(segment -> new SegmentWithOvershadowedStatus( From 75183d0605cf0c9d7e3eac731e955324f4153bdc Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 26 Apr 2019 16:10:54 -0700 Subject: [PATCH 16/17] minor refactoring --- .../timeline/VersionedIntervalTimeline.java | 9 ------- .../client/ImmutableDruidDataSource.java | 25 ++++++++++++++++--- .../calcite/schema/MetadataSegmentView.java | 2 +- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java index bbc24afdc10b..36c177dfb0f0 100644 --- a/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java +++ b/core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java @@ -109,15 +109,6 @@ public static void addSegments( ); } - public static Map> buildTimelines(Iterable segments) - { - final Map> timelines = new HashMap<>(); - segments.forEach(segment -> timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); - return timelines; - } - @VisibleForTesting public Map> getAllTimelineEntries() { diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index a41a87c9eaf7..841b7169458e 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -25,11 +25,13 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Ordering; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -113,14 +115,13 @@ public long getTotalSizeOfSegments() } /** - * This method finds the overshadowed segments in this datasource + * This method finds the overshadowed segments from the given segments * - * @return set of overshadowed segment ids + * @return set of overshadowed segments */ public static Set determineOvershadowedSegments(Iterable segments) { - final Map> timelines = VersionedIntervalTimeline - .buildTimelines(segments); + final Map> timelines = buildTimelines(segments); final Set overshadowedSegments = new HashSet<>(); for (DataSegment dataSegment : segments) { @@ -132,6 +133,22 @@ public static Set determineOvershadowedSegments(Iterable> buildTimelines( + Iterable segments + ) + { + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + return timelines; + } + @Override public String toString() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index f63ab897360c..25e56316053b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.ListenableFuture; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DataSegmentInterner; @@ -47,7 +48,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; From 52e297dc97dba1f3c99b1aedd447914bcb9a0c34 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 29 Apr 2019 12:43:03 -0700 Subject: [PATCH 17/17] PR comments --- .../calcite/schema/MetadataSegmentView.java | 36 ++++++++----------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 25e56316053b..18d288eb6915 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -25,7 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.ListenableFuture; -import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DataSegmentInterner; @@ -45,16 +45,16 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jboss.netty.handler.codec.http.HttpMethod; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; /** * This class polls the coordinator in background to keep the latest published segments. @@ -72,18 +72,19 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; - private final Object lock = new Object(); /** * Use {@link ImmutableSortedSet} so that the order of segments is deterministic and - * sys.segments queries return the segments in sorted order based on segmentId + * sys.segments queries return the segments in sorted order based on segmentId. + * + * Volatile since this reference is reassigned in {@code poll()} and then read in {@code getPublishedSegments()} + * from other threads. */ - @Nullable - @GuardedBy("lock") - private ImmutableSortedSet publishedSegments = null; + @MonotonicNonNull + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); - private final AtomicBoolean cachePopulated = new AtomicBoolean(false); + private final CountDownLatch cachePopulated = new CountDownLatch(1); @Inject public MetadataSegmentView( @@ -155,24 +156,15 @@ private void poll() ); builder.add(segmentWithOvershadowedStatus); } - // build operation can be expensive for high cardinality segments, so calling it outside "lock" - final ImmutableSortedSet immutableSortedSet = builder.build(); - synchronized (lock) { - publishedSegments = immutableSortedSet; - } - cachePopulated.set(true); + publishedSegments = builder.build(); + cachePopulated.countDown(); } public Iterator getPublishedSegments() { if (isCacheEnabled) { - Preconditions.checkState( - lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), - "hold on, still syncing published segments" - ); - synchronized (lock) { - return publishedSegments.iterator(); - } + Uninterruptibles.awaitUninterruptibly(cachePopulated); + return publishedSegments.iterator(); } else { return getMetadataSegments( coordinatorDruidLeaderClient,