Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,9 @@ Segments table provides details on all Druid segments, whether they are publishe
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime). See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment.|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.|
|payload|STRING|JSON-serialized data segment payload|
|shardSpec|STRING|The toString of specific `ShardSpec`|
|dimensions|STRING|The dimensions of the segment|
|metrics|STRING|The metrics of the segment|

For example to retrieve all segments for datasource "wikipedia", use the query:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"payload": "{\"overshadowed\":false,\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}"
"shardSpec": "NoneShardSpec",
"dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]",
"metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.sql.calcite.schema;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -56,7 +55,6 @@
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.Request;
Expand Down Expand Up @@ -143,7 +141,9 @@ public class SystemSchema extends AbstractSchema
.add("is_available", ValueType.LONG)
.add("is_realtime", ValueType.LONG)
.add("is_overshadowed", ValueType.LONG)
.add("payload", ValueType.STRING)
.add("shardSpec", ValueType.STRING)
.add("dimensions", ValueType.STRING)
.add("metrics", ValueType.STRING)
.build();

static final RowSignature SERVERS_SIGNATURE = RowSignature
Expand Down Expand Up @@ -294,37 +294,34 @@ public Enumerable<Object[]> scan(DataContext root)
final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> getAuthorizedPublishedSegments(metadataStoreSegments, root))
.transform(val -> {
try {
final DataSegment segment = val.getDataSegment();
segmentsAlreadySeen.add(segment.getId());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();
isAvailable = partialSegmentData.isAvailable();
isRealtime = partialSegmentData.isRealtime();
}
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
Long.valueOf(segment.getShardSpec().getPartitionNum()),
numReplicas,
numRows,
IS_PUBLISHED_TRUE, //is_published is true for published segments
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
jsonMapper.writeValueAsString(val)
};
}
catch (JsonProcessingException e) {
throw new RE(e, "Error getting segment payload for segment %s", val.getDataSegment().getId());
final DataSegment segment = val.getDataSegment();
segmentsAlreadySeen.add(segment.getId());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(segment.getId());
long numReplicas = 0L, numRows = 0L, isRealtime = 0L, isAvailable = 0L;
if (partialSegmentData != null) {
numReplicas = partialSegmentData.getNumReplicas();
numRows = partialSegmentData.getNumRows();
isAvailable = partialSegmentData.isAvailable();
isRealtime = partialSegmentData.isRealtime();
}
return new Object[]{
segment.getId(),
segment.getDataSource(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
segment.getSize(),
segment.getVersion(),
Long.valueOf(segment.getShardSpec().getPartitionNum()),
numReplicas,
numRows,
IS_PUBLISHED_TRUE, //is_published is true for published segments
isAvailable,
isRealtime,
val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE,
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics()
};
});

final FluentIterable<Object[]> availableSegments = FluentIterable
Expand All @@ -333,33 +330,30 @@ public Enumerable<Object[]> scan(DataContext root)
root
))
.transform(val -> {
try {
if (segmentsAlreadySeen.contains(val.getKey())) {
return null;
}
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getValue().getSegment().getSize(),
val.getKey().getVersion(),
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
jsonMapper.writeValueAsString(val.getKey())
};
}
catch (JsonProcessingException e) {
throw new RE(e, "Error getting segment payload for segment %s", val.getKey());
if (segmentsAlreadySeen.contains(val.getKey())) {
return null;
}
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
return new Object[]{
val.getKey(),
val.getKey().getDataSource(),
val.getKey().getInterval().getStart().toString(),
val.getKey().getInterval().getEnd().toString(),
val.getValue().getSegment().getSize(),
val.getKey().getVersion(),
(long) val.getValue().getSegment().getShardSpec().getPartitionNum(),
numReplicas,
val.getValue().getNumRows(),
IS_PUBLISHED_FALSE, // is_published is false for unpublished segments
// is_available is assumed to be always true for segments announced by historicals or realtime tasks
IS_AVAILABLE_TRUE,
val.getValue().isRealtime(),
IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed
val.getValue().getSegment().getShardSpec(),
val.getValue().getSegment().getDimensions(),
val.getValue().getSegment().getMetrics()
};
});

final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
Expand Down Expand Up @@ -467,7 +468,7 @@ public void testGetTableMap()
final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();

Assert.assertEquals(14, fields.size());
Assert.assertEquals(16, fields.size());

final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks");
final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl());
Expand Down Expand Up @@ -1243,6 +1244,10 @@ private static void verifyTypes(final List<Object[]> rows, final RowSignature si
case STRING:
if (signature.getColumnName(i).equals("segment_id")) {
expectedClass = SegmentId.class;
} else if (signature.getColumnName(i).equals("shardSpec")) {
expectedClass = ShardSpec.class;
} else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) {
expectedClass = List.class;
} else {
expectedClass = String.class;
}
Expand Down
14 changes: 3 additions & 11 deletions web-console/src/views/segments-view/segments-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ interface SegmentQueryResultRow {
version: string;
size: 0;
partition_num: number;
payload: any;
num_rows: number;
num_replicas: number;
is_available: number;
Expand Down Expand Up @@ -211,6 +210,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
`FROM sys.segments`,
whereClause ? `WHERE ${whereClause}` : '',
`GROUP BY 1`,
`ORDER BY 1 DESC`,
`LIMIT ${totalQuerySize}`,
]).join('\n');

Expand All @@ -221,7 +221,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
queryParts = compact([
`SELECT`,
` ("start" || '/' || "end") AS "interval",`,
` "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed", "payload"`,
` "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed"`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know TypeScript much, but shouldn't we add the additional fields to this query ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @surekhasaharan, I think the content in this popover already includes these fields.
image

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks like this information is fetched outside of system table from /druid/coordinator/v1/metadata/datasources/${datasourceId}/segments/${segmentId}

`FROM sys.segments`,
`WHERE`,
intervals ? ` ("start" || '/' || "end") IN (${intervals})` : 'FALSE',
Expand All @@ -240,7 +240,7 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
queryParts.push(`LIMIT ${totalQuerySize * 1000}`);
} else {
queryParts = [
`SELECT "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed", "payload"`,
`SELECT "segment_id", "datasource", "start", "end", "size", "version", "partition_num", "num_replicas", "num_rows", "is_published", "is_available", "is_realtime", "is_overshadowed"`,
`FROM sys.segments`,
];

Expand All @@ -264,13 +264,6 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
const results: any[] = (await queryDruidSql({ query: sqlQuery })).slice(
query.page * query.pageSize,
);
results.forEach(result => {
try {
result.payload = JSON.parse(result.payload);
} catch {
result.payload = {};
}
});
return results;
},
onStateChange: ({ result, loading, error }) => {
Expand Down Expand Up @@ -299,7 +292,6 @@ export class SegmentsView extends React.PureComponent<SegmentsViewProps, Segment
version: segment.version,
partition_num: segment.shardSpec.partitionNum ? 0 : segment.shardSpec.partitionNum,
size: segment.size,
payload: segment,
num_rows: -1,
num_replicas: -1,
is_available: -1,
Expand Down