Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.druid.timeline;

import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -51,5 +51,9 @@ public interface TimelineLookup<VersionType, ObjectType extends Overshadowable<O
*/
List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);

@Nullable PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);
/**
* Finds the {@link PartitionChunk} for the given time interval, version and chunk number.
*/
@Nullable
PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.timeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -117,10 +116,14 @@ public static void addSegments(
)
{
timeline.addAll(
Iterators.transform(segments, segment -> segment.getShardSpec().createChunk(segment)),
DataSegment::getInterval,
DataSegment::getVersion
);
Iterators.transform(
segments,
segment -> new PartitionChunkEntry<>(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segment)
)
));
}

public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries()
Expand Down Expand Up @@ -183,13 +186,11 @@ public Set<ObjectType> findNonOvershadowedObjectsInInterval(Interval interval, P

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
addAll(Iterators.singletonIterator(new PartitionChunkEntry<>(interval, version, object)));
}

private void addAll(
final Iterator<PartitionChunk<ObjectType>> objects,
final Function<ObjectType, Interval> intervalFunction,
final Function<ObjectType, VersionType> versionFunction
public void addAll(
final Iterator<PartitionChunkEntry<VersionType, ObjectType>> objects
)
{
lock.writeLock().lock();
Expand All @@ -198,9 +199,10 @@ private void addAll(
final IdentityHashMap<TimelineEntry, Interval> allEntries = new IdentityHashMap<>();

while (objects.hasNext()) {
PartitionChunk<ObjectType> object = objects.next();
Interval interval = intervalFunction.apply(object.getObject());
VersionType version = versionFunction.apply(object.getObject());
PartitionChunkEntry<VersionType, ObjectType> chunkEntry = objects.next();
PartitionChunk<ObjectType> object = chunkEntry.getChunk();
Interval interval = chunkEntry.getInterval();
VersionType version = chunkEntry.getVersion();
Map<VersionType, TimelineEntry> exists = allTimelineEntries.get(interval);
TimelineEntry entry;

Expand Down Expand Up @@ -284,15 +286,15 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,

@Override
@Nullable
public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version)
public PartitionChunk<ObjectType> findChunk(Interval interval, VersionType version, int partitionNum)
{
lock.readLock().lock();
try {
for (Entry<Interval, TreeMap<VersionType, TimelineEntry>> entry : allTimelineEntries.entrySet()) {
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
return foundEntry.getPartitionHolder().asImmutable();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the asImmutable method still needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like its not. will delete this method as well as the class ImmutablePartitionHolder.

return foundEntry.getPartitionHolder().getChunk(partitionNum);
}
}
}
Expand Down Expand Up @@ -849,4 +851,41 @@ public int hashCode()
return Objects.hash(trueInterval, version, partitionHolder);
}
}

/**
* Stores a {@link PartitionChunk} for a given interval and version. The
* interval corresponds to the {@link LogicalSegment#getInterval()}
*/
public static class PartitionChunkEntry<VersionType, ObjectType>
{
private final Interval interval;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc explaining what this interval is. There are two types of interval in timeline. See LogicalSegment.

private final VersionType version;
private final PartitionChunk<ObjectType> chunk;

public PartitionChunkEntry(
Interval interval,
VersionType version,
PartitionChunk<ObjectType> chunk
)
{
this.interval = interval;
this.version = version;
this.chunk = chunk;
}

public Interval getInterval()
{
return interval;
}

public VersionType getVersion()
{
return version;
}

public PartitionChunk<ObjectType> getChunk()
{
return chunk;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ protected PartitionHolder(OvershadowableManager<T> overshadowableManager)
this.overshadowableManager = overshadowableManager;
}

public ImmutablePartitionHolder<T> asImmutable()
{
return new ImmutablePartitionHolder<>(OvershadowableManager.copyVisible(overshadowableManager));
}

public boolean add(PartitionChunk<T> chunk)
{
return overshadowableManager.addChunk(chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
*/
@Deprecated
public class SingleElementPartitionChunk<T> implements PartitionChunk<T>
{
private final T element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.partition.IntegerPartitionChunk;
import org.apache.druid.timeline.partition.OvershadowableInteger;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.Hours;
Expand Down Expand Up @@ -221,36 +220,64 @@ public void testRemove()
}

@Test
public void testFindEntry()
public void testFindChunk()
{
Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-02"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01/2011-10-02"), "1", 0)
);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01/2011-10-01T10"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01/2011-10-01T10"), "1", 0)
);

assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01T02/2011-10-02"), "1", 0)
);

assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 0)
);

IntegerPartitionChunk<OvershadowableInteger> expected = IntegerPartitionChunk.make(
10,
null,
1,
new OvershadowableInteger(
"3",
1,
21
)
);
IntegerPartitionChunk<OvershadowableInteger> actual = (IntegerPartitionChunk<OvershadowableInteger>) timeline.findChunk(
Intervals.of("2011-10-02/2011-10-03"),
"3",
1
);
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getObject(), actual.getObject());

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T02/2011-10-02"), "1")
null,
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "1", 1)
);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "1")
null,
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-01T17"), "2", 0)
);

Assert.assertEquals(
null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-01T17"), "2")
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
);

Assert.assertEquals(
null,
timeline.findEntry(Intervals.of("2011-10-01T04/2011-10-02T17"), "1")
timeline.findChunk(Intervals.of("2011-10-01T04/2011-10-02T17"), "1", 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ public void setUp()
}

@Test
public void testFindEntryWithOverlap()
public void testFindChunkWithOverlap()
{
add("2011-01-01/2011-01-10", "1", 1);
add("2011-01-02/2011-01-05", "2", 1);

Assert.assertEquals(
new PartitionHolder<>(makeSingle("1", 1)).asImmutable(),
timeline.findEntry(Intervals.of("2011-01-02T02/2011-01-04"), "1")
assertSingleElementChunks(
makeSingle("1", 1),
timeline.findChunk(Intervals.of("2011-01-02T02/2011-01-04"), "1", 0)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ static void assertValues(
Assert.assertEquals(expected, actualSet);
}

static void assertSingleElementChunks(
PartitionChunk<OvershadowableInteger> expected,
PartitionChunk<OvershadowableInteger> actual
)
{
SingleElementPartitionChunk<OvershadowableInteger> expectedSingle = (SingleElementPartitionChunk<OvershadowableInteger>) expected;
Copy link
Copy Markdown
Contributor

@jihoonson jihoonson Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleElementPartitionChunk is used only by deprecated things such as NoneShardSpec and Tranquility. We should deprecate SingleElementPartitionChunk as well and stop using it. You can use NumberedPartitionChunk instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's unrelated to my change, I will leave this one for now as it is. Though I did mark SingleElementPartitionChunk deprecated.

SingleElementPartitionChunk<OvershadowableInteger> actualSingle = (SingleElementPartitionChunk<OvershadowableInteger>) actual;
Assert.assertEquals(expectedSingle.getObject(), actualSingle.getObject());
}

static VersionedIntervalTimeline<String, OvershadowableInteger> makeStringIntegerTimeline()
{
return new VersionedIntervalTimeline<>(Ordering.natural());
Expand Down
Loading