Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1083,10 +1083,10 @@ Segments table provides details on all Druid segments, whether they are publishe
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime). See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment.|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|shardSpec|STRING|The toString of specific `ShardSpec`|
|dimensions|STRING|The dimensions of the segment|
|metrics|STRING|The metrics of the segment|
|last_compaction_state|STRING|The configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.|
|shard_spec|STRING|JSON-serialized form of the segment `ShardSpec`|
|dimensions|STRING|JSON-serialized form of the segment dimensions|
|metrics|STRING|JSON-serialized form of the segment metrics|
|last_compaction_state|STRING|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|

For example to retrieve all segments for datasource "wikipedia", use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"shardSpec": "NoneShardSpec",
"dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]",
"metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]",
"shard_spec": "{\"type\":\"none\"}",
"dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
"metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
"last_compaction_state": null
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.sql.calcite.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class SystemSchema extends AbstractSchema
.add("is_available", ValueType.LONG)
.add("is_realtime", ValueType.LONG)
.add("is_overshadowed", ValueType.LONG)
.add("shardSpec", ValueType.STRING)
.add("shard_spec", ValueType.STRING)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardSpec was introduced in this change #9883 and changing it to shard_spec would be a breaking change. I don't think we should do this without an upgrade plan

Copy link
Copy Markdown
Contributor Author

@maytasm maytasm Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is anything in Druid depending on shardSpec (similar to how the linked PR changed payload to shardSpec, metrics, dimensions). I can update the document but I don't think anything will break. I guess unless someone has a custom script reading this field which is probably unlikely and is why we should change this ASAP.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linked PR added those fields, so we didn't have to worry about backwards incompatibility.

@yuanlihan since you introduced this field, do you have any comments on changing the name of this field?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linked PR removed payload and added shardSpec, metrics, dimensions

.add("dimensions", ValueType.STRING)
.add("metrics", ValueType.STRING)
.add("last_compaction_state", ValueType.STRING)
Expand Down Expand Up @@ -213,7 +214,7 @@ public SystemSchema(
{
Preconditions.checkNotNull(serverView, "serverView");
this.tableMap = ImmutableMap.of(
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper),
SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper),
Expand All @@ -233,17 +234,20 @@ public Map<String, Table> getTableMap()
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView;

public SegmentsTable(
DruidSchema druidSchemna,
MetadataSegmentView metadataView,
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper
)
{
this.druidSchema = druidSchemna;
this.metadataView = metadataView;
this.jsonMapper = jsonMapper;
this.authorizerMapper = authorizerMapper;
}

Expand Down Expand Up @@ -296,25 +300,30 @@ public Enumerable<Object[]> scan(DataContext root)
isAvailable = partialSegmentData.isAvailable();
isRealtime = partialSegmentData.isRealtime();
}
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
numRows,
IS_PUBLISHED_TRUE, //is_published is true for published segments
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics(),
segment.getLastCompactionState()
};
try {
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
(long) segment.getShardSpec().getPartitionNum(),
numReplicas,
numRows,
IS_PUBLISHED_TRUE, //is_published is true for published segments
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()),
segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()),
segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()),
segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState())
};
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});

final FluentIterable<Object[]> availableSegments = FluentIterable
Expand All @@ -328,26 +337,33 @@ public Enumerable<Object[]> scan(DataContext root)
}
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getValue().getSegment().getSize(),
val.getKey().getVersion(),
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
val.getValue().getSegment().getShardSpec(),
val.getValue().getSegment().getDimensions(),
val.getValue().getSegment().getMetrics(),
null // unpublished segments from realtime tasks will not be compacted yet
};
try {
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getValue().getSegment().getSize(),
val.getKey().getVersion(),
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
IS_PUBLISHED_FALSE,
// is_published is false for unpublished segments
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE,
// there is an assumption here that unpublished segments are never overshadowed
val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
null // unpublished segments from realtime tasks will not be compacted yet
};
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});

final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
Expand Down Expand Up @@ -523,9 +522,9 @@ public void testGetTableMap()
}

@Test
public void testSegmentsTable()
public void testSegmentsTable() throws Exception
{
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper);
final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper);
final Set<SegmentWithOvershadowedStatus> publishedSegments = new HashSet<>(Arrays.asList(
new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
Expand Down Expand Up @@ -706,7 +705,7 @@ private void verifyRow(
long isRealtime,
long isOvershadowed,
CompactionState compactionState
)
) throws Exception
{
Assert.assertEquals(segmentId, row[0].toString());
SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
Expand All @@ -722,7 +721,11 @@ private void verifyRow(
Assert.assertEquals(isAvailable, row[10]);
Assert.assertEquals(isRealtime, row[11]);
Assert.assertEquals(isOvershadowed, row[12]);
Assert.assertEquals(compactionState, row[16]);
if (compactionState == null) {
Assert.assertNull(row[16]);
} else {
Assert.assertEquals(mapper.writeValueAsString(compactionState), row[16]);
}
}

@Test
Expand Down Expand Up @@ -1289,12 +1292,6 @@ private static void verifyTypes(final List<Object[]> rows, final RowSignature si
case STRING:
if (signature.getColumnName(i).equals("segment_id")) {
expectedClass = SegmentId.class;
} else if (signature.getColumnName(i).equals("shardSpec")) {
expectedClass = ShardSpec.class;
} else if (signature.getColumnName(i).equals("last_compaction_state")) {
expectedClass = CompactionState.class;
} else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) {
expectedClass = List.class;
} else {
expectedClass = String.class;
}
Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ queue_insertion_time
runner_status
segment_id
server_type
shard_spec
sqlTimeZone
supervisor_id
sys
Expand Down