Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ public class TimelineObjectHolder<VersionType, ObjectType> implements LogicalSeg
{
private final Interval interval;
private final VersionType version;
private final long approxSize;
private final PartitionHolder<ObjectType> object;

public TimelineObjectHolder(
Interval interval,
VersionType version,
long approxSize,
PartitionHolder<ObjectType> object
)
{
this.interval = interval;
this.version = version;
this.approxSize = approxSize;
this.object = object;
}

Expand All @@ -52,6 +55,11 @@ public VersionType getVersion()
return version;
}

public long getApproximatedSize()
{
return approxSize;
}

public PartitionHolder<ObjectType> getObject()
{
return object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@

/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
*
* <p/>
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
*
* <p/>
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
* with a timeline entry remains unchanged when chunking occurs.
*
* <p/>
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
* recent objects (according to the version) that match the given interval. The intent is that objects represent
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
* at in order to get a correct answer about that time period.
*
* <p/>
* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because
* they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if
Expand Down Expand Up @@ -81,6 +81,11 @@ public VersionedIntervalTimeline(
}

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
add(interval, version, object, -1);
}

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object, long approxSize)
{
try {
lock.writeLock().lock();
Expand All @@ -89,15 +94,15 @@ public void add(final Interval interval, VersionType version, PartitionChunk<Obj
TimelineEntry entry = null;

if (exists == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object));
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object), approxSize);
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<VersionType, TimelineEntry>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
} else {
entry = exists.get(version);

if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object));
entry = new TimelineEntry(interval, version, new PartitionHolder<ObjectType>(object), approxSize);
exists.put(version, entry);
} else {
PartitionHolder<ObjectType> partitionHolder = entry.getPartitionHolder();
Expand Down Expand Up @@ -179,7 +184,7 @@ public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType vers
* @param interval interval to find objects for
*
* @return Holders representing the interval that the objects exist for, PartitionHolders
* are guaranteed to be complete
* are guaranteed to be complete
*/
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval)
{
Expand Down Expand Up @@ -244,6 +249,7 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
new TimelineObjectHolder<VersionType, ObjectType>(
object.getTrueInterval(),
object.getVersion(),
object.getApproximatedSize(),
object.getPartitionHolder()
)
);
Expand Down Expand Up @@ -293,10 +299,10 @@ private void add(
}

/**
*
* @param timeline
* @param key
* @param entry
*
* @return boolean flag indicating whether or not we inserted or discarded something
*/
private boolean addAtKey(
Expand Down Expand Up @@ -446,6 +452,7 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
new TimelineObjectHolder<VersionType, ObjectType>(
timelineInterval,
val.getVersion(),
val.getApproximatedSize(),
val.getPartitionHolder()
)
);
Expand All @@ -464,6 +471,7 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
new TimelineObjectHolder<VersionType, ObjectType>(
new Interval(interval.getStart(), firstEntry.getInterval().getEnd()),
firstEntry.getVersion(),
firstEntry.getApproximatedSize(),
firstEntry.getObject()
)
);
Expand All @@ -476,6 +484,7 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
new TimelineObjectHolder<VersionType, ObjectType>(
new Interval(lastEntry.getInterval().getStart(), interval.getEnd()),
lastEntry.getVersion(),
lastEntry.getApproximatedSize(),
lastEntry.getObject()
)
);
Expand All @@ -489,12 +498,19 @@ public class TimelineEntry
private final Interval trueInterval;
private final VersionType version;
private final PartitionHolder<ObjectType> partitionHolder;

public TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder<ObjectType> partitionHolder)
private final long approxSize;

public TimelineEntry(
Interval trueInterval,
VersionType version,
PartitionHolder<ObjectType> partitionHolder,
long approxSize
)
{
this.trueInterval = trueInterval;
this.version = version;
this.partitionHolder = partitionHolder;
this.approxSize = approxSize;
}

public Interval getTrueInterval()
Expand All @@ -511,5 +527,10 @@ public PartitionHolder<ObjectType> getPartitionHolder()
{
return partitionHolder;
}

public long getApproximatedSize()
{
return approxSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ private void add(String interval, String version, PartitionChunk<Integer> value)

private void add(Interval interval, String version, PartitionChunk<Integer> value)
{
timeline.add(interval, version, value);
timeline.add(interval, version, value, -1);
}

private void assertValues(
Expand Down
197 changes: 197 additions & 0 deletions extensions-core/hive-extensions/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.druid.extensions</groupId>
<artifactId>druid-hive-extensions</artifactId>
<name>druid-hive-extensions</name>
<description>druid-hive-extensions</description>

<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>

<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.compile.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading