Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/io/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public DataSegment withDimensions(List<String> dimensions)
return builder(this).dimensions(dimensions).build();
}

public DataSegment withMetrics(List<String> metrics)
{
return builder(this).metrics(metrics).build();
}

public DataSegment withSize(long size)
{
return builder(this).size(size).build();
Expand Down
4 changes: 4 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,7 @@ In current Druid, multiple data segments may be announced under the same Znode.
|--------|-----------|-------|
|`druid.announcer.segmentsPerNode`|Each Znode contains info for up to this many segments.|50|
|`druid.announcer.maxBytesPerNode`|Max byte size for Znode.|524288|
|`druid.announcer.skipDimensions`|Skip Dimension list from segment announcements. NOTE: Enabling this will also remove the dimensions list from coordinator and broker endpoints.|false|
|`druid.announcer.skipMetrics`|Skip Metrics list from segment announcements. NOTE: Enabling this will also remove the metrics list from coordinator and broker endpoints.|false|
|`druid.announcer.skipLoadSpec`|Skip segment LoadSpec from segment announcements. NOTE: Enabling this will also remove the loadspec from coordinator and broker endpoints.|false|

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
Expand Down Expand Up @@ -59,11 +61,12 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer

private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
private final Function<DataSegment, DataSegment> segmentTransformer;

@Inject
public BatchDataSegmentAnnouncer(
DruidServerMetadata server,
BatchDataSegmentAnnouncerConfig config,
final BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPaths,
Announcer announcer,
ObjectMapper jsonMapper
Expand All @@ -76,12 +79,31 @@ public BatchDataSegmentAnnouncer(
this.server = server;

this.liveSegmentLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), server.getName());
segmentTransformer = new Function<DataSegment, DataSegment>()
{
@Override
public DataSegment apply(DataSegment input)
{
DataSegment rv = input;
if (config.isSkipDimensions()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these wil need to be documented

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docs

rv = rv.withDimensions(null);
}
if (config.isSkipMetrics()) {
rv = rv.withMetrics(null);
}
if (config.isSkipLoadSpec()) {
rv = rv.withLoadSpec(null);
}
return rv;
}
};
}

@Override
public void announceSegment(DataSegment segment) throws IOException
{
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
DataSegment toAnnounce = segmentTransformer.apply(segment);
int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length;
if (newBytesLen > config.getMaxBytesPerNode()) {
throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode());
}
Expand All @@ -94,11 +116,15 @@ public void announceSegment(DataSegment segment) throws IOException
while (iter.hasNext() && !done) {
SegmentZNode availableZNode = iter.next();
if (availableZNode.getBytes().length + newBytesLen < config.getMaxBytesPerNode()) {
availableZNode.addSegment(segment);
availableZNode.addSegment(toAnnounce);

log.info("Announcing segment[%s] at existing path[%s]", segment.getIdentifier(), availableZNode.getPath());
log.info(
"Announcing segment[%s] at existing path[%s]",
toAnnounce.getIdentifier(),
availableZNode.getPath()
);
announcer.update(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
segmentLookup.put(toAnnounce, availableZNode);

if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
availableZNodes.remove(availableZNode);
Expand All @@ -118,11 +144,11 @@ public void announceSegment(DataSegment segment) throws IOException
// create new batch

SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath());
availableZNode.addSegment(segment);
availableZNode.addSegment(toAnnounce);

log.info("Announcing segment[%s] at new path[%s]", segment.getIdentifier(), availableZNode.getPath());
log.info("Announcing segment[%s] at new path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
segmentLookup.put(segment, availableZNode);
segmentLookup.put(toAnnounce, availableZNode);
availableZNodes.add(availableZNode);
}
}
Expand Down Expand Up @@ -154,12 +180,13 @@ public void unannounceSegment(DataSegment segment) throws IOException
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
Iterable<DataSegment> toAnnounce = Iterables.transform(segments, segmentTransformer);
SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath());
Set<DataSegment> batch = Sets.newHashSet();
int byteSize = 0;
int count = 0;

for (DataSegment segment : segments) {
for (DataSegment segment : toAnnounce) {
int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;

if (newBytesLen > config.getMaxBytesPerNode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ public class BatchDataSegmentAnnouncerConfig
@Min(1024)
private long maxBytesPerNode = 512 * 1024;

// Skip LoadSpec from segment announcements
@JsonProperty
private boolean skipLoadSpec = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does removing loadspec have any impact on the information that the coordinator exposes via endpoints?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coordinator has end point for getting info for a particular segmentId which includes loadSpec too.


// Skip dimension list from segment announcements
@JsonProperty
private boolean skipDimensions = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need separate configs for dimensions and metrics? Is there any use in having one but not the other?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added separate config in case someone has any tools built on top which needs metrics list and not dimensions list or vice versa.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it actually make sense to have one but not the other?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #2821 for follow up.


// Skip metrics list from segment announcements
@JsonProperty
private boolean skipMetrics = false;

public int getSegmentsPerNode()
{
return segmentsPerNode;
Expand All @@ -46,4 +58,19 @@ public long getMaxBytesPerNode()
{
return maxBytesPerNode;
}

public boolean isSkipLoadSpec()
{
return skipLoadSpec;
}

public boolean isSkipDimensions()
{
return skipDimensions;
}

public boolean isSkipMetrics()
{
return skipMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
Expand Down Expand Up @@ -66,6 +69,10 @@ public class BatchDataSegmentAnnouncerTest
private Set<DataSegment> testSegments;

private final AtomicInteger maxBytesPerNode = new AtomicInteger(512 * 1024);
private Boolean skipDimensions;
private Boolean skipMetrics;
private Boolean skipLoadSpec;


@Before
public void setUp() throws Exception
Expand All @@ -91,6 +98,9 @@ public void setUp() throws Exception
announcer.start();

segmentReader = new SegmentReader(cf, jsonMapper);
skipDimensions = false;
skipMetrics = false;
skipLoadSpec = false;
segmentAnnouncer = new BatchDataSegmentAnnouncer(
new DruidServerMetadata(
"id",
Expand All @@ -113,6 +123,24 @@ public long getMaxBytesPerNode()
{
return maxBytesPerNode.get();
}

@Override
public boolean isSkipDimensions()
{
return skipDimensions;
}

@Override
public boolean isSkipMetrics()
{
return skipMetrics;
}

@Override
public boolean isSkipLoadSpec()
{
return skipLoadSpec;
}
},
new ZkPathsConfig()
{
Expand Down Expand Up @@ -177,13 +205,79 @@ public void testSingleAnnounce() throws Exception
Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSkipDimensions() throws Exception
{
skipDimensions = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();

segmentAnnouncer.announceSegment(firstSegment);

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);

for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertTrue(announcedSegment.getDimensions().isEmpty());
}

segmentAnnouncer.unannounceSegment(firstSegment);

Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSkipMetrics() throws Exception
{
skipMetrics = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();

segmentAnnouncer.announceSegment(firstSegment);

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);

for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertTrue(announcedSegment.getMetrics().isEmpty());
}

segmentAnnouncer.unannounceSegment(firstSegment);

Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSkipLoadSpec() throws Exception
{
skipLoadSpec = true;
Iterator<DataSegment> segIter = testSegments.iterator();
DataSegment firstSegment = segIter.next();

segmentAnnouncer.announceSegment(firstSegment);

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);

for (String zNode : zNodes) {
DataSegment announcedSegment = Iterables.getOnlyElement(segmentReader.read(joiner.join(testSegmentsPath, zNode)));
Assert.assertEquals(announcedSegment, firstSegment);
Assert.assertNull(announcedSegment.getLoadSpec());
}

segmentAnnouncer.unannounceSegment(firstSegment);

Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty());
}

@Test
public void testSingleAnnounceManyTimes() throws Exception
{
int prevMax = maxBytesPerNode.get();
maxBytesPerNode.set(2048);
// each segment is about 317 bytes long and that makes 2048 / 317 = 6 segments included per node
// so 100 segments makes (100 / 6) + 1 = 17 nodes
// each segment is about 348 bytes long and that makes 2048 / 348 = 5 segments included per node
// so 100 segments makes 100 / 5 = 20 nodes
try {
for (DataSegment segment : testSegments) {
segmentAnnouncer.announceSegment(segment);
Expand All @@ -194,7 +288,7 @@ public void testSingleAnnounceManyTimes() throws Exception
}

List<String> zNodes = cf.getChildren().forPath(testSegmentsPath);
Assert.assertEquals(17, zNodes.size());
Assert.assertEquals(20, zNodes.size());

Set<DataSegment> segments = Sets.newHashSet(testSegments);
for (String zNode : zNodes) {
Expand Down Expand Up @@ -244,6 +338,9 @@ private DataSegment makeSegment(int offset)
)
)
.version(new DateTime().toString())
.dimensions(ImmutableList.<String>of("dim1", "dim2"))
.metrics(ImmutableList.<String>of("met1", "met2"))
.loadSpec(ImmutableMap.<String, Object>of("type", "local"))
.build();
}

Expand Down