Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
Expand Down Expand Up @@ -93,6 +94,7 @@ public YeOldePlumberSchool(

@Override
public Plumber findPlumber(
final SegmentAllocator segmentAllocator,
final DataSchema schema,
final RealtimeTuningConfig config,
final FireDepartmentMetrics metrics
Expand Down Expand Up @@ -123,7 +125,7 @@ public Object startJob()
}

@Override
public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
public int add(InputRow row, String sequenceName, Supplier<Committer> committerSupplier) throws IndexSizeExceededException
{
Sink sink = getSink(row.getTimestampFromEpoch());
if (sink == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
Expand All @@ -57,6 +57,7 @@
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
Expand All @@ -65,12 +66,10 @@
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -261,35 +260,10 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
};

// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
// NOTE: (and thus the firehose)

// Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the
// realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in
// the plumber such that waiting for the coordinator doesn't block data processing.
final VersioningPolicy versioningPolicy = new VersioningPolicy()
{
@Override
public String getVersion(final Interval interval)
{
try {
// Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval, lockTimeoutMs));

return myLock.getVersion();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
};

DataSchema dataSchema = spec.getDataSchema();
RealtimeIOConfig realtimeIOConfig = spec.getIOConfig();
RealtimeTuningConfig tuningConfig = spec.getTuningConfig()
.withBasePersistDirectory(toolbox.getPersistDir())
.withVersioningPolicy(versioningPolicy);
.withBasePersistDirectory(toolbox.getPersistDir());

final FireDepartment fireDepartment = new FireDepartment(
dataSchema,
Expand All @@ -305,6 +279,8 @@ public String getVersion(final Interval interval)
);
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();

final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox);

// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip
// NOTE: (partitionNum_index.zip for HDFS data storage) and descriptor.json (partitionNum_descriptor.json for
Expand All @@ -325,7 +301,7 @@ public String getVersion(final Interval interval)
toolbox.getObjectMapper()
);

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics);
this.plumber = plumberSchool.findPlumber(segmentAllocator, dataSchema, tuningConfig, metrics);

Supplier<Committer> committerSupplier = null;
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
Expand Down Expand Up @@ -367,10 +343,13 @@ public String getVersion(final Interval interval)
}
}

final String sequenceName = getId();

// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
Plumbers.addNextRow(
committerSupplier,
sequenceName,
firehose,
plumber,
tuningConfig.isReportParseExceptions(),
Expand Down Expand Up @@ -532,6 +511,11 @@ && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).
&& isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
}

protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox)
{
return new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), spec.getDataSchema());
}

public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
Expand Down Expand Up @@ -63,7 +64,7 @@ public TestRealtimeTask(
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
Expand Down
Loading