Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;

/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
Expand All @@ -31,6 +32,12 @@
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
{
private final boolean overshadowed;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
* enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
* there will be no change in the serialized format.
*/
@JsonUnwrapped
private final DataSegment dataSegment;

@JsonCreator
Expand Down
19 changes: 18 additions & 1 deletion core/src/main/java/org/apache/druid/utils/CollectionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.apache.druid.utils;

import com.google.common.collect.Maps;

import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -68,5 +72,18 @@ public int size()
};
}

private CollectionUtils() {}
/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
*/
public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{
final Map<K, V2> result = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, v) -> result.put(k, valueMapper.apply(v)));
return result;
}

private CollectionUtils()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class SegmentWithOvershadowedStatusTest
{
private static final ObjectMapper mapper = new TestObjectMapper();
private static final int TEST_VERSION = 0x9;

@Before
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
mapper.setInjectableValues(injectableValues);
}

@Test
public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception
{
final Interval interval = Intervals.of("2011-10-01/2011-10-02");
final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");

final DataSegment dataSegment = new DataSegment(
"something",
interval,
"1",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
TEST_VERSION,
1
);

final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false);

final Map<String, Object> objectMap = mapper.readValue(
mapper.writeValueAsString(segment),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);

Assert.assertEquals(11, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(false, objectMap.get("overshadowed"));

final String json = mapper.writeValueAsString(segment);

final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue(
json,
TestSegmentWithOvershadowedStatus.class
);

Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource());
Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval());
Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion());
Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec());
Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions());
Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics());
Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId());

}
}

/**
* Subclass of DataSegment with overshadowed status
*/
class TestSegmentWithOvershadowedStatus extends DataSegment
{
private final boolean overshadowed;

@JsonCreator
public TestSegmentWithOvershadowedStatus(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> dimensions,
@JsonProperty("metrics")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
)
{
super(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
binaryVersion,
size
);
this.overshadowed = overshadowed;
}

@JsonProperty
public boolean isOvershadowed()
{
return overshadowed;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
return false;
}
if (!super.equals(o)) {
return false;
}
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
segmentManager.removeSegment(segment.getId());
segmentManager.removeSegment(segment.getId().toString());
}
}
// data of the latest interval will be built firstly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void setup() throws IOException

expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId()));
expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
}

private DataSegment createSegment(Interval interval, String version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"},\"overshadowed\":false}"
"payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}"
}
]
114 changes: 114 additions & 0 deletions server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
* overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
* to return dataSources and overshadowedSegments.
*/
public class DataSourcesSnapshot
{
private final Map<String, ImmutableDruidDataSource> dataSources;
private final ImmutableSet<SegmentId> overshadowedSegments;

public DataSourcesSnapshot(
Map<String, ImmutableDruidDataSource> dataSources
)
{
this.dataSources = dataSources;
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
}

public Collection<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values();
}

public Map<String, ImmutableDruidDataSource> getDataSourcesMap()
{
return dataSources;
}

@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
return dataSources.get(dataSourceName);
}

public ImmutableSet<SegmentId> getOvershadowedSegments()
{
return overshadowedSegments;
}

@Nullable
public Iterable<DataSegment> iterateAllSegmentsInSnapshot()
{
if (dataSources == null) {
return null;
}
return () -> dataSources.values().stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}

/**
* This method builds timelines from all dataSources and finds the overshadowed segments list
*
* @return overshadowed segment Ids list
*/
private List<SegmentId> determineOvershadowedSegments()
{
final List<DataSegment> segments = dataSources.values().stream()
.flatMap(ds -> ds.getSegments().stream())
.collect(Collectors.toList());
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
segments.forEach(segment -> timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));

// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final List<SegmentId> overshadowedSegments = new ArrayList<>();
for (DataSegment dataSegment : segments) {
final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
overshadowedSegments.add(dataSegment.getId());
}
}
return overshadowedSegments;
}

}
Loading