Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions api/src/main/java/io/druid/timeline/TaskDataSegment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.druid.timeline;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing license header

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haoxiang47 there are many whitespace changes, can we please revert them?
https://github.com/druid-io/druid/pull/3831/files?w=1 indicates there's only new code added

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval;

import java.util.Map;

/**
* Created by haoxiang on 16/11/11.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally dont have author info

*/
public class TaskDataSegment
{

private final String type;
private final String id;
private final String groupId;
private final String dataSource;
private final Map<String, Object> ioConfig;
private final Interval interval;

@JsonCreator
public TaskDataSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this class is needed? What is the class used for serde when writing to pending Segments table?

@JsonProperty("type") String type,
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("ioConfig") Map<String, Object> ioConfig,
@JsonProperty("interval") Interval interval
)
{
this.type = type;
this.id = id;
this.groupId = groupId;
this.dataSource = dataSource;
this.ioConfig = ioConfig;
this.interval = interval;
}

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird comment

* Get dataSource
*
* @return the dataSource
*/

@JsonProperty
public String getType()
{
return type;
}

@JsonProperty
public String getId()
{
return id;
}

@JsonProperty
public String getGroupId()
{
return groupId;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@JsonProperty
public Map<String, Object> getIoConfig()
{
return ioConfig;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@Override
public String toString()
{
return "TaskDataSegment{" +
"type=" + type +
", id=" + id +
", groupId=" + groupId +
", dataSource=" + dataSource +
'}';
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import org.joda.time.Interval;

import java.io.IOException;
Expand All @@ -39,10 +40,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
final private Set<DataSegment> published = Sets.newConcurrentHashSet();
final private Set<DataSegment> nuked = Sets.newConcurrentHashSet();
final private List<DataSegment> unusedSegments;
final private List<TaskDataSegment> taskDataSegments;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inactiveTaskSegments


public TestIndexerMetadataStorageCoordinator()
{
unusedSegments = Lists.newArrayList();
taskDataSegments = Lists.newArrayList();
}

@Override
Expand All @@ -65,7 +68,7 @@ public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval

@Override
public List<DataSegment> getUsedSegmentsForIntervals(
String dataSource, List<Interval> intervals
String dataSource, List<Interval> intervals
) throws IOException
{
return ImmutableList.of();
Expand Down Expand Up @@ -93,9 +96,9 @@ public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)

@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
Set<DataSegment> segments,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
) throws IOException
{
// Don't actually compare metadata, just do it!
Expand All @@ -104,11 +107,11 @@ public SegmentPublishResult announceHistoricalSegments(

@Override
public SegmentIdentifier allocatePendingSegment(
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
) throws IOException
{
throw new UnsupportedOperationException();
Expand All @@ -120,6 +123,12 @@ public void deleteSegments(Set<DataSegment> segments)
nuked.addAll(segments);
}

@Override
public List<TaskDataSegment> getNotActiveTask(final Interval interval){
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InactiveTasks*


return ImmutableList.copyOf(taskDataSegments);
}

@Override
public void updateSegmentMetadata(Set<DataSegment> segments) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TaskDataSegment;
import org.joda.time.Interval;

import java.io.IOException;
Expand All @@ -42,7 +43,7 @@ public interface IndexerMetadataStorageCoordinator
* @throws IOException
*/
List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval)
throws IOException;
throws IOException;

/**
* Get all segments which may include any data in the interval and are flagged as used.
Expand All @@ -55,7 +56,7 @@ List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interva
* @throws IOException
*/
List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals)
throws IOException;
throws IOException;

/**
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
Expand Down Expand Up @@ -86,11 +87,11 @@ List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final Lis
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdentifier allocatePendingSegment(
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
) throws IOException;

/**
Expand All @@ -113,9 +114,9 @@ SegmentIdentifier allocatePendingSegment(
* @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException;

DataSourceMetadata getDataSourceMetadata(String dataSource);
Expand All @@ -141,4 +142,11 @@ SegmentPublishResult announceHistoricalSegments(
* @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval
*/
List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval);

/**
* Get all task which active property is 0, is used to delete the useless segments in pendingSegment table.
* @param interval Filter the tasks to ones that include tasks in this interval exclusively. Start is inclusive, end is exclusive
* @return The TaskDataSegment list which used to match the pendingSegment table's payload
*/
List<TaskDataSegment> getNotActiveTask(final Interval interval);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InactiveTasks

}
Loading