diff --git a/api/src/main/java/io/druid/timeline/DataSegment.java b/api/src/main/java/io/druid/timeline/DataSegment.java index ec9f39bc81f1..97e9fffda727 100644 --- a/api/src/main/java/io/druid/timeline/DataSegment.java +++ b/api/src/main/java/io/druid/timeline/DataSegment.java @@ -221,6 +221,11 @@ public DataSegment withDimensions(List dimensions) return builder(this).dimensions(dimensions).build(); } + public DataSegment withMetrics(List metrics) + { + return builder(this).metrics(metrics).build(); + } + public DataSegment withSize(long size) { return builder(this).size(size).build(); diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index 7ea4de291dd2..7913e89a7c16 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -296,3 +296,7 @@ In current Druid, multiple data segments may be announced under the same Znode. |--------|-----------|-------| |`druid.announcer.segmentsPerNode`|Each Znode contains info for up to this many segments.|50| |`druid.announcer.maxBytesPerNode`|Max byte size for Znode.|524288| +|`druid.announcer.skipDimensions`|Skip Dimension list from segment announcements. NOTE: Enabling this will also remove the dimensions list from coordinator and broker endpoints.|false| +|`druid.announcer.skipMetrics`|Skip Metrics list from segment announcements. NOTE: Enabling this will also remove the metrics list from coordinator and broker endpoints.|false| +|`druid.announcer.skipLoadSpec`|Skip segment LoadSpec from segment announcements. NOTE: Enabling this will also remove the loadspec from coordinator and broker endpoints.|false| + diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java b/server/src/main/java/io/druid/segment/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java deleted file mode 100644 index f040db388d88..000000000000 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeCuratorDataSegmentAnnouncerConfig.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.realtime; - -/** - */ -public abstract class RealtimeCuratorDataSegmentAnnouncerConfig -{ -} diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 503649a5073d..8aef0f2ff41b 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -21,7 +21,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.inject.Inject; @@ -59,11 +61,12 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private final Set availableZNodes = new ConcurrentSkipListSet(); private final Map segmentLookup = Maps.newConcurrentMap(); + private final Function segmentTransformer; @Inject public BatchDataSegmentAnnouncer( DruidServerMetadata server, - BatchDataSegmentAnnouncerConfig config, + final BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, Announcer announcer, ObjectMapper jsonMapper @@ -76,12 +79,31 @@ public BatchDataSegmentAnnouncer( this.server = server; this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName()); + segmentTransformer = new Function() + { + @Override + public DataSegment apply(DataSegment input) + { + DataSegment rv = input; + if (config.isSkipDimensions()) { + rv = rv.withDimensions(null); + } + if (config.isSkipMetrics()) { + rv = rv.withMetrics(null); + } + if (config.isSkipLoadSpec()) { + rv = rv.withLoadSpec(null); + } + return rv; + } + }; } @Override public void announceSegment(DataSegment segment) throws IOException { - int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; + DataSegment toAnnounce = segmentTransformer.apply(segment); + int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length; if (newBytesLen > config.getMaxBytesPerNode()) { throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); } @@ -94,11 +116,15 @@ public void announceSegment(DataSegment segment) throws IOException while (iter.hasNext() && !done) { SegmentZNode availableZNode = iter.next(); if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) { - availableZNode.addSegment(segment); + availableZNode.addSegment(toAnnounce); - log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath()); + log.info( + "Announcing segment[%s] at existing path[%s]", + toAnnounce.getIdentifier(), + availableZNode.getPath() + ); announcer.update(availableZNode.getPath(), availableZNode.getBytes()); - segmentLookup.put(segment, availableZNode); + segmentLookup.put(toAnnounce, availableZNode); if (availableZNode.getCount() >= config.getSegmentsPerNode()) { availableZNodes.remove(availableZNode); @@ -118,11 +144,11 @@ public void announceSegment(DataSegment segment) throws IOException // create new batch SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath()); - availableZNode.addSegment(segment); + availableZNode.addSegment(toAnnounce); - log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath()); + log.info("Announcing segment[%s] at new path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath()); announcer.announce(availableZNode.getPath(), availableZNode.getBytes()); - segmentLookup.put(segment, availableZNode); + segmentLookup.put(toAnnounce, availableZNode); availableZNodes.add(availableZNode); } } @@ -154,12 +180,13 @@ public void unannounceSegment(DataSegment segment) throws IOException @Override public void announceSegments(Iterable segments) throws IOException { + Iterable toAnnounce = Iterables.transform(segments, segmentTransformer); SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath()); Set batch = Sets.newHashSet(); int byteSize = 0; int count = 0; - for (DataSegment segment : segments) { + for (DataSegment segment : toAnnounce) { int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; if (newBytesLen > config.getMaxBytesPerNode()) { diff --git a/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java b/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java index 87f625116d00..7c31679ae9a5 100644 --- a/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java @@ -37,6 +37,18 @@ public class BatchDataSegmentAnnouncerConfig @Min(1024) private long maxBytesPerNode = 512 * 1024; + // Skip LoadSpec from segment announcements + @JsonProperty + private boolean skipLoadSpec = false; + + // Skip dimension list from segment announcements + @JsonProperty + private boolean skipDimensions = false; + + // Skip metrics list from segment announcements + @JsonProperty + private boolean skipMetrics = false; + public int getSegmentsPerNode() { return segmentsPerNode; @@ -46,4 +58,19 @@ public long getMaxBytesPerNode() { return maxBytesPerNode; } + + public boolean isSkipLoadSpec() + { + return skipLoadSpec; + } + + public boolean isSkipDimensions() + { + return skipDimensions; + } + + public boolean isSkipMetrics() + { + return skipMetrics; + } } diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index 08024ed7d1ad..9c1ef0992b92 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -23,6 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import io.druid.curator.PotentiallyGzippedCompressionProvider; @@ -66,6 +69,10 @@ public class BatchDataSegmentAnnouncerTest private Set testSegments; private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024); + private Boolean skipDimensions; + private Boolean skipMetrics; + private Boolean skipLoadSpec; + @Before public void setUp() throws Exception @@ -91,6 +98,9 @@ public void setUp() throws Exception announcer.start(); segmentReader = new SegmentReader(cf, jsonMapper); + skipDimensions = false; + skipMetrics = false; + skipLoadSpec = false; segmentAnnouncer = new BatchDataSegmentAnnouncer( new DruidServerMetadata( "id", @@ -113,6 +123,24 @@ public long getMaxBytesPerNode() { return maxBytesPerNode.get(); } + + @Override + public boolean isSkipDimensions() + { + return skipDimensions; + } + + @Override + public boolean isSkipMetrics() + { + return skipMetrics; + } + + @Override + public boolean isSkipLoadSpec() + { + return skipLoadSpec; + } }, new ZkPathsConfig() { @@ -177,13 +205,79 @@ public void testSingleAnnounce() throws Exception Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); } + @Test + public void testSkipDimensions() throws Exception + { + skipDimensions = true; + Iterator segIter = testSegments.iterator(); + DataSegment firstSegment = segIter.next(); + + segmentAnnouncer.announceSegment(firstSegment); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + for (String zNode : zNodes) { + DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode))); + Assert.assertEquals(announcedSegment, firstSegment); + Assert.assertTrue(announcedSegment.getDimensions().isEmpty()); + } + + segmentAnnouncer.unannounceSegment(firstSegment); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + @Test + public void testSkipMetrics() throws Exception + { + skipMetrics = true; + Iterator segIter = testSegments.iterator(); + DataSegment firstSegment = segIter.next(); + + segmentAnnouncer.announceSegment(firstSegment); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + for (String zNode : zNodes) { + DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode))); + Assert.assertEquals(announcedSegment, firstSegment); + Assert.assertTrue(announcedSegment.getMetrics().isEmpty()); + } + + segmentAnnouncer.unannounceSegment(firstSegment); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + + @Test + public void testSkipLoadSpec() throws Exception + { + skipLoadSpec = true; + Iterator segIter = testSegments.iterator(); + DataSegment firstSegment = segIter.next(); + + segmentAnnouncer.announceSegment(firstSegment); + + List zNodes = cf.getChildren().forPath(testSegmentsPath); + + for (String zNode : zNodes) { + DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode))); + Assert.assertEquals(announcedSegment, firstSegment); + Assert.assertNull(announcedSegment.getLoadSpec()); + } + + segmentAnnouncer.unannounceSegment(firstSegment); + + Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + } + @Test public void testSingleAnnounceManyTimes() throws Exception { int prevMax = maxBytesPerNode.get(); maxBytesPerNode.set(2048); - // each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node - // so 100 segments makes (100 / 6) + 1 = 17 nodes + // each segment is about 348 bytes long and that makes 2048 / 348 = 5 segments included per node + // so 100 segments makes 100 / 5 = 20 nodes try { for (DataSegment segment : testSegments) { segmentAnnouncer.announceSegment(segment); @@ -194,7 +288,7 @@ public void testSingleAnnounceManyTimes() throws Exception } List zNodes = cf.getChildren().forPath(testSegmentsPath); - Assert.assertEquals(17, zNodes.size()); + Assert.assertEquals(20, zNodes.size()); Set segments = Sets.newHashSet(testSegments); for (String zNode : zNodes) { @@ -244,6 +338,9 @@ private DataSegment makeSegment(int offset) ) ) .version(new DateTime().toString()) + .dimensions(ImmutableList.of("dim1", "dim2")) + .metrics(ImmutableList.of("met1", "met2")) + .loadSpec(ImmutableMap.of("type", "local")) .build(); }