Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6025bcd
Move the overshadowed segment computation to SQLMetadataSegmentManage…
May 6, 2019
1964b33
rename method in MetadataSegmentManager
May 6, 2019
a32dc08
Fix tests
May 6, 2019
88a3620
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 6, 2019
258bacc
PR comments
May 8, 2019
700e9bc
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 8, 2019
18093ad
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 24, 2019
fd7fdde
PR comments
May 25, 2019
34773c7
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 25, 2019
8d2879e
PR comments
May 29, 2019
42f0349
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 29, 2019
a8af218
fix indentation
May 29, 2019
90e4454
fix tests
May 29, 2019
134d46e
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 29, 2019
2bb7d79
fix test
May 29, 2019
41d7182
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 29, 2019
1e2f28e
add test for SegmentWithOvershadowedStatus serde format
May 29, 2019
9aefc2d
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 29, 2019
bae854c
PR comments
May 30, 2019
0eed90d
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 30, 2019
c6e88aa
PR comments
May 31, 2019
3b0fb27
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 31, 2019
7361f83
fix test
May 31, 2019
a580229
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
May 31, 2019
fdd8e2c
remove snapshot updates outside poll
Jun 3, 2019
48fd3f5
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
Jun 3, 2019
9090586
PR comments
Jun 4, 2019
b6ed0de
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
Jun 4, 2019
ed019c3
PR comments
Jun 5, 2019
334dbd7
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
Jun 5, 2019
1183f1f
PR comments
Jun 6, 2019
8911b6a
Merge branch 'master' of github.com:druid-io/druid into optimize-over…
Jun 6, 2019
59395fa
removed unused import
Jun 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;

/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
Expand All @@ -31,6 +32,12 @@
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
{
private final boolean overshadowed;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
* enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a test that verifies that the serialized form generated with the unwrapped annotation can be successfully deserialized as a subclass (can make a test impl) of DataSegment

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, should test this, will try to add one.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test

* there will be no change in the serialized format.
*/
@JsonUnwrapped
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also ensure that the data is sent in smile format, not JSON, between Coordinator and Brokers?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #7798

private final DataSegment dataSegment;

@JsonCreator
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/apache/druid/utils/CollectionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.apache.druid.utils;

import com.google.common.collect.Maps;

import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -68,6 +72,17 @@ public int size()
};
}

/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the list of things to address in a follow-up PR: highlight the difference from Maps.transformValues() in the documentation, e. g. add a sentence like Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do. Thanks.

public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{
final Map<K, V2> result = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, v) -> result.put(k, valueMapper.apply(v)));
return result;
}

private CollectionUtils()
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class SegmentWithOvershadowedStatusTest
{
private static final ObjectMapper mapper = new TestObjectMapper();
private static final int TEST_VERSION = 0x9;

@Before
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
mapper.setInjectableValues(injectableValues);
}

@Test
public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception
{
final Interval interval = Intervals.of("2011-10-01/2011-10-02");
final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");

final DataSegment dataSegment = new DataSegment(
"something",
interval,
"1",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
TEST_VERSION,
1
);

final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false);

final Map<String, Object> objectMap = mapper.readValue(
mapper.writeValueAsString(segment),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);

Assert.assertEquals(11, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(false, objectMap.get("overshadowed"));

final String json = mapper.writeValueAsString(segment);

final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue(
json,
TestSegmentWithOvershadowedStatus.class
);

Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource());
Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval());
Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion());
Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec());
Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions());
Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics());
Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId());

}
}

/**
* Subclass of DataSegment with overshadowed status
*/
class TestSegmentWithOvershadowedStatus extends DataSegment
{
private final boolean overshadowed;

@JsonCreator
public TestSegmentWithOvershadowedStatus(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> dimensions,
@JsonProperty("metrics")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
)
{
super(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
binaryVersion,
size
);
this.overshadowed = overshadowed;
}

@JsonProperty
public boolean isOvershadowed()
{
return overshadowed;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
return false;
}
if (!super.equals(o)) {
return false;
}
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
segmentManager.removeSegment(segment.getId());
segmentManager.removeSegment(segment.getId().toString());
}
}
// data of the latest interval will be built firstly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void setup() throws IOException

expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId()));
expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
}

private DataSegment createSegment(Interval interval, String version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"},\"overshadowed\":false}"
"payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}"
}
]
114 changes: 114 additions & 0 deletions server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
* overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
* to return dataSources and overshadowedSegments.
*/
public class DataSourcesSnapshot
{
private final Map<String, ImmutableDruidDataSource> dataSources;
private final ImmutableSet<SegmentId> overshadowedSegments;

public DataSourcesSnapshot(
Map<String, ImmutableDruidDataSource> dataSources
)
{
this.dataSources = dataSources;
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
}

public Collection<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values();
}

public Map<String, ImmutableDruidDataSource> getDataSourcesMap()
{
return dataSources;
}

@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
return dataSources.get(dataSourceName);
}

public ImmutableSet<SegmentId> getOvershadowedSegments()
{
return overshadowedSegments;
}

@Nullable
public Iterable<DataSegment> iterateAllSegmentsInSnapshot()
{
if (dataSources == null) {
return null;
}
return () -> dataSources.values().stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}

/**
* This method builds timelines from all dataSources and finds the overshadowed segments list
*
* @return overshadowed segment Ids list
*/
private List<SegmentId> determineOvershadowedSegments()
{
final List<DataSegment> segments = dataSources.values().stream()
.flatMap(ds -> ds.getSegments().stream())
.collect(Collectors.toList());
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
segments.forEach(segment -> timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));

// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final List<SegmentId> overshadowedSegments = new ArrayList<>();
for (DataSegment dataSegment : segments) {
final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
overshadowedSegments.add(dataSegment.getId());
}
}
return overshadowedSegments;
}

}
Loading