From 9992c0d331924cabc0601ff8c37940601530db17 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 1 Oct 2020 14:21:07 -0700 Subject: [PATCH 1/6] fix JSON format --- .../sql/calcite/schema/SystemSchema.java | 97 +++++++++++-------- .../sql/calcite/schema/SystemSchemaTest.java | 2 +- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 16e5c0aa3b53..791de6684a6e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; @@ -142,7 +143,7 @@ public class SystemSchema extends AbstractSchema .add("is_available", ValueType.LONG) .add("is_realtime", ValueType.LONG) .add("is_overshadowed", ValueType.LONG) - .add("shardSpec", ValueType.STRING) + .add("shard_spec", ValueType.STRING) .add("dimensions", ValueType.STRING) .add("metrics", ValueType.STRING) .add("last_compaction_state", ValueType.STRING) @@ -213,7 +214,7 @@ public SystemSchema( { Preconditions.checkNotNull(serverView, "serverView"); this.tableMap = ImmutableMap.of( - SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, authorizerMapper), + SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper), SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), @@ -233,17 +234,20 @@ public Map getTableMap() static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; + private final ObjectMapper jsonMapper; private final AuthorizerMapper authorizerMapper; private final MetadataSegmentView metadataView; public SegmentsTable( DruidSchema druidSchemna, MetadataSegmentView metadataView, + ObjectMapper jsonMapper, AuthorizerMapper authorizerMapper ) { this.druidSchema = druidSchemna; this.metadataView = metadataView; + this.jsonMapper = jsonMapper; this.authorizerMapper = authorizerMapper; } @@ -296,25 +300,29 @@ public Enumerable scan(DataContext root) isAvailable = partialSegmentData.isAvailable(); isRealtime = partialSegmentData.isRealtime(); } - return new Object[]{ - segment.getId(), - segment.getDataSource(), - segment.getInterval().getStart().toString(), - segment.getInterval().getEnd().toString(), - segment.getSize(), - segment.getVersion(), - (long) segment.getShardSpec().getPartitionNum(), - numReplicas, - numRows, - IS_PUBLISHED_TRUE, //is_published is true for published segments - isAvailable, - isRealtime, - val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, - segment.getShardSpec(), - segment.getDimensions(), - segment.getMetrics(), - segment.getLastCompactionState() - }; + try { + return new Object[]{ + segment.getId(), + segment.getDataSource(), + segment.getInterval().getStart().toString(), + segment.getInterval().getEnd().toString(), + segment.getSize(), + segment.getVersion(), + (long) segment.getShardSpec().getPartitionNum(), + numReplicas, + numRows, + IS_PUBLISHED_TRUE, //is_published is true for published segments + isAvailable, + isRealtime, + val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, + jsonMapper.writeValueAsString(segment.getShardSpec()), + jsonMapper.writeValueAsString(segment.getDimensions()), + jsonMapper.writeValueAsString(segment.getMetrics()), + jsonMapper.writeValueAsString(segment.getLastCompactionState()) + }; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } }); final FluentIterable availableSegments = FluentIterable @@ -328,26 +336,33 @@ public Enumerable scan(DataContext root) } final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey()); final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas(); - return new Object[]{ - val.getKey(), - val.getKey().getDataSource(), - val.getKey().getInterval().getStart().toString(), - val.getKey().getInterval().getEnd().toString(), - val.getValue().getSegment().getSize(), - val.getKey().getVersion(), - (long) val.getValue().getSegment().getShardSpec().getPartitionNum(), - numReplicas, - val.getValue().getNumRows(), - IS_PUBLISHED_FALSE, // is_published is false for unpublished segments - // is_available is assumed to be always true for segments announced by historicals or realtime tasks - IS_AVAILABLE_TRUE, - val.getValue().isRealtime(), - IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed - val.getValue().getSegment().getShardSpec(), - val.getValue().getSegment().getDimensions(), - val.getValue().getSegment().getMetrics(), - null // unpublished segments from realtime tasks will not be compacted yet - }; + try { + return new Object[]{ + val.getKey(), + val.getKey().getDataSource(), + val.getKey().getInterval().getStart().toString(), + val.getKey().getInterval().getEnd().toString(), + val.getValue().getSegment().getSize(), + val.getKey().getVersion(), + (long) val.getValue().getSegment().getShardSpec().getPartitionNum(), + numReplicas, + val.getValue().getNumRows(), + IS_PUBLISHED_FALSE, + // is_published is false for unpublished segments + // is_available is assumed to be always true for segments announced by historicals or realtime tasks + IS_AVAILABLE_TRUE, + val.getValue().isRealtime(), + IS_OVERSHADOWED_FALSE, + // there is an assumption here that unpublished segments are never overshadowed + jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()), + jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), + jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), + null + // unpublished segments from realtime tasks will not be compacted yet + }; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } }); final Iterable allSegments = Iterables.unmodifiableIterable( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index f943aafef54d..f0d68774a5de 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -525,7 +525,7 @@ public void testGetTableMap() @Test public void testSegmentsTable() { - final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, authMapper); + final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); final Set publishedSegments = new HashSet<>(Arrays.asList( new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true), new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false), From 1c14aad4c67a5d538a6923695f96f4d9fd83af4d Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 1 Oct 2020 18:11:26 -0700 Subject: [PATCH 2/6] Change all columns in sys segments to be JSON --- .../druid/sql/calcite/schema/SystemSchema.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 791de6684a6e..3e1f681afbc2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -315,10 +315,10 @@ public Enumerable scan(DataContext root) isAvailable, isRealtime, val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : IS_OVERSHADOWED_FALSE, - jsonMapper.writeValueAsString(segment.getShardSpec()), - jsonMapper.writeValueAsString(segment.getDimensions()), - jsonMapper.writeValueAsString(segment.getMetrics()), - jsonMapper.writeValueAsString(segment.getLastCompactionState()) + segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()), + segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()), + segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), + segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()) }; } catch (JsonProcessingException e) { throw new RuntimeException(e); @@ -354,11 +354,10 @@ public Enumerable scan(DataContext root) val.getValue().isRealtime(), IS_OVERSHADOWED_FALSE, // there is an assumption here that unpublished segments are never overshadowed - jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()), - jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), - jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), - null - // unpublished segments from realtime tasks will not be compacted yet + val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()), + val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), + val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), + null // unpublished segments from realtime tasks will not be compacted yet }; } catch (JsonProcessingException e) { throw new RuntimeException(e); From a2f9d86cafc1b634af904599cf96c813e56e7553 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 1 Oct 2020 18:13:02 -0700 Subject: [PATCH 3/6] Change all columns in sys segments to be JSON --- .../org/apache/druid/sql/calcite/schema/SystemSchema.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 3e1f681afbc2..19c6b22d7b98 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -320,7 +320,8 @@ public Enumerable scan(DataContext root) segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()) }; - } catch (JsonProcessingException e) { + } + catch (JsonProcessingException e) { throw new RuntimeException(e); } }); @@ -359,7 +360,8 @@ public Enumerable scan(DataContext root) val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), null // unpublished segments from realtime tasks will not be compacted yet }; - } catch (JsonProcessingException e) { + } + catch (JsonProcessingException e) { throw new RuntimeException(e); } }); From 4a0448d736cd26aa56280632631139e158e1d13a Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 5 Oct 2020 16:46:37 -0700 Subject: [PATCH 4/6] add tests --- docs/querying/sql.md | 8 ++++---- .../resources/results/auth_test_sys_schema_segments.json | 6 +++--- .../apache/druid/sql/calcite/schema/SystemSchemaTest.java | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 702070d9ace3..395070fe8412 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -1083,10 +1083,10 @@ Segments table provides details on all Druid segments, whether they are publishe |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime). See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment.| |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.| -|shardSpec|STRING|The toString of specific `ShardSpec`| -|dimensions|STRING|The dimensions of the segment| -|metrics|STRING|The metrics of the segment| -|last_compaction_state|STRING|The configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.| +|shard_spec|STRING|JSON-serialized of the segment `ShardSpec`| +|dimensions|STRING|JSON-serialized of the segment dimensions| +|metrics|STRING|JSON-serialized of the segment metrics| +|last_compaction_state|STRING|JSON-serialized configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index b09e9de9a166..09f7a400265c 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -13,9 +13,9 @@ "is_available": 1, "is_realtime": 0, "is_overshadowed": 0, - "shardSpec": "NoneShardSpec", - "dimensions": "[anonymous, area_code, city, continent_code, country_name, dma_code, geo, language, namespace, network, newpage, page, postal_code, region_lookup, robot, unpatrolled, user]", - "metrics": "[added, count, deleted, delta, delta_hist, unique_users, variation]", + "shard_spec": "{\"type\":\"none\"}", + "dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]", + "metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"], last_compaction_state=null}]", "last_compaction_state": null } ] diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index f0d68774a5de..e82a2df1f4af 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1289,7 +1289,7 @@ private static void verifyTypes(final List rows, final RowSignature si case STRING: if (signature.getColumnName(i).equals("segment_id")) { expectedClass = SegmentId.class; - } else if (signature.getColumnName(i).equals("shardSpec")) { + } else if (signature.getColumnName(i).equals("shard_spec")) { expectedClass = ShardSpec.class; } else if (signature.getColumnName(i).equals("last_compaction_state")) { expectedClass = CompactionState.class; From a0d83f0d6ae172d21209b1cd4d025300d99dffe0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 13 Oct 2020 22:21:04 -0700 Subject: [PATCH 5/6] fix failing tests --- docs/querying/sql.md | 8 ++++---- .../results/auth_test_sys_schema_segments.json | 2 +- .../sql/calcite/schema/SystemSchemaTest.java | 16 +++++++--------- website/.spelling | 1 + 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 395070fe8412..861397265bf8 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -1083,10 +1083,10 @@ Segments table provides details on all Druid segments, whether they are publishe |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is currently being served by any process(Historical or realtime). See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.| |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is _only_ served by realtime tasks, and 0 if any historical process is serving this segment.| |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = false. 1 if this segment is published and is _fully_ overshadowed by some other published segments. Currently, is_overshadowed is always false for unpublished segments, although this may change in the future. You can filter for segments that "should be published" by filtering for `is_published = 1 AND is_overshadowed = 0`. Segments can briefly be both published and overshadowed if they were recently replaced, but have not been unpublished yet. See the [Architecture page](../design/architecture.md#segment-lifecycle) for more details.| -|shard_spec|STRING|JSON-serialized of the segment `ShardSpec`| -|dimensions|STRING|JSON-serialized of the segment dimensions| -|metrics|STRING|JSON-serialized of the segment metrics| -|last_compaction_state|STRING|JSON-serialized configurations of the compaction task which created this segment. May be null if segment was not created by compaction task.| +|shard_spec|STRING|JSON-serialized form of the segment `ShardSpec`| +|dimensions|STRING|JSON-serialized form of the segment dimensions| +|metrics|STRING|JSON-serialized form of the segment metrics| +|last_compaction_state|STRING|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| For example to retrieve all segments for datasource "wikipedia", use the query: diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index 09f7a400265c..d064733ce798 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -15,7 +15,7 @@ "is_overshadowed": 0, "shard_spec": "{\"type\":\"none\"}", "dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]", - "metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"], last_compaction_state=null}]", + "metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]", "last_compaction_state": null } ] diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index e82a2df1f4af..87eb9a548cdd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -523,7 +523,7 @@ public void testGetTableMap() } @Test - public void testSegmentsTable() + public void testSegmentsTable() throws Exception { final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); final Set publishedSegments = new HashSet<>(Arrays.asList( @@ -706,7 +706,7 @@ private void verifyRow( long isRealtime, long isOvershadowed, CompactionState compactionState - ) + ) throws Exception { Assert.assertEquals(segmentId, row[0].toString()); SegmentId id = Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0); @@ -722,7 +722,11 @@ private void verifyRow( Assert.assertEquals(isAvailable, row[10]); Assert.assertEquals(isRealtime, row[11]); Assert.assertEquals(isOvershadowed, row[12]); - Assert.assertEquals(compactionState, row[16]); + if (compactionState == null) { + Assert.assertNull(row[16]); + } else { + Assert.assertEquals(mapper.writeValueAsString(compactionState), row[16]); + } } @Test @@ -1289,12 +1293,6 @@ private static void verifyTypes(final List rows, final RowSignature si case STRING: if (signature.getColumnName(i).equals("segment_id")) { expectedClass = SegmentId.class; - } else if (signature.getColumnName(i).equals("shard_spec")) { - expectedClass = ShardSpec.class; - } else if (signature.getColumnName(i).equals("last_compaction_state")) { - expectedClass = CompactionState.class; - } else if (signature.getColumnName(i).equals("dimensions") || signature.getColumnName(i).equals("metrics")) { - expectedClass = List.class; } else { expectedClass = String.class; } diff --git a/website/.spelling b/website/.spelling index a783d32ddc3b..80d67e2f8a3e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1513,6 +1513,7 @@ queue_insertion_time runner_status segment_id server_type +shard_spec sqlTimeZone supervisor_id sys From c169833b09b54b216f08ad8a9efa0d4f5ba57e64 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 13 Oct 2020 22:22:25 -0700 Subject: [PATCH 6/6] fix failing tests --- .../org/apache/druid/sql/calcite/schema/SystemSchemaTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 87eb9a548cdd..46aa3cca6785 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -95,7 +95,6 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; -import org.apache.druid.timeline.partition.ShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpMethod;