Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.*;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
Expand All @@ -54,6 +52,7 @@
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.FirehoseV2CloseBeforeStartException;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
Expand Down Expand Up @@ -116,6 +115,9 @@ private static String makeDatasource(FireDepartment fireDepartment)
@JsonIgnore
private volatile Firehose firehose = null;

@JsonIgnore
private volatile FirehoseV2 firehoseV2 = null;

@JsonIgnore
private volatile FireDepartmentMetrics metrics = null;

Expand Down Expand Up @@ -321,32 +323,70 @@ public String getVersion(final Interval interval)
Supplier<Committer> committerSupplier = null;

try {
plumber.startJob();
Object metadata = plumber.startJob();

// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);

// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
if (fireDepartment.checkFirehoseV2())
{
final boolean firehoseV2DrainableByClosing =
isFirehoseV2DrainableByClosing(spec.getIOConfig().getFirehoseFactoryV2());
boolean normalStart = true;

// Skip connecting firehose if we've been stopped before we got started.
synchronized (this) {
if (!gracefullyStopped) {
firehoseV2 = fireDepartment.connect(metadata);
committerSupplier = Committers.supplierFromFirehoseV2(firehoseV2);
try {
firehoseV2.start();
} catch (FirehoseV2CloseBeforeStartException e)
{
normalStart = false;
}
}
}

// Skip connecting firehose if we've been stopped before we got started.
synchronized (this) {
if (!gracefullyStopped) {
firehose = firehoseFactory.connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);
if (normalStart)
{
// Time to read data!
while (firehoseV2 != null && (!gracefullyStopped || firehoseV2DrainableByClosing))
{
Plumbers.addNextRowV2(
committerSupplier,
firehoseV2,
plumber,
tuningConfig.isReportParseExceptions(),
metrics
);

if (!firehoseV2.advance()) break;
}
}
} else {
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final boolean firehoseDrainableByClosing =
isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory());

// Skip connecting firehose if we've been stopped before we got started.
synchronized (this) {
if (!gracefullyStopped) {
firehose = fireDepartment.connect();
committerSupplier = Committers.supplierFromFirehose(firehose);
}
}
}

// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
Plumbers.addNextRow(
committerSupplier,
firehose,
plumber,
tuningConfig.isReportParseExceptions(),
metrics
);
// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
Plumbers.addNextRow(
committerSupplier,
firehose,
plumber,
tuningConfig.isReportParseExceptions(),
metrics
);
}
}
}
catch (Throwable e) {
Expand All @@ -359,7 +399,7 @@ public String getVersion(final Interval interval)
if (normalExit) {
try {
// Persist if we had actually started.
if (firehose != null) {
if (firehose != null || firehoseV2 != null) {
log.info("Persisting remaining data.");

final Committer committer = committerSupplier.get();
Expand Down Expand Up @@ -417,6 +457,9 @@ public void run()
if (firehose != null) {
CloseQuietly.close(firehose);
}
if (firehoseV2 != null) {
CloseQuietly.close(firehoseV2);
}
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}
Expand All @@ -439,14 +482,17 @@ public void stopGracefully()
synchronized (this) {
if (!gracefullyStopped) {
gracefullyStopped = true;
if (firehose == null) {
if (firehose == null && firehoseV2 == null) {
log.info("stopGracefully: Firehose not started yet, so nothing to stop.");
} else if (finishingJob) {
log.info("stopGracefully: Interrupting finishJob.");
runThread.interrupt();
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
} else if (firehose != null && isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
log.info("stopGracefully: Draining firehose.");
firehose.close();
} else if (firehoseV2 != null && isFirehoseV2DrainableByClosing(spec.getIOConfig().getFirehoseFactoryV2())) {
log.info("stopGracefully: Draining firehoseV2.");
firehoseV2.close();
} else {
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
runThread.interrupt();
Expand All @@ -468,6 +514,15 @@ public Firehose getFirehose()
return firehose;
}

/**
* Public for tests.
*/
@JsonIgnore
public FirehoseV2 getFirehoseV2()
{
return firehoseV2;
}

/**
* Public for tests.
*/
Expand Down Expand Up @@ -500,6 +555,19 @@ && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).
&& isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
}

/**
* Is a firehoseV2 from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
*
* This is a hack to get around the fact that the FirehoseV2 and FirehoseFactoryV2 interfaces do not help us do this.
* And, currently no FirehoseFactoryV2 implementation supports drainable by closing yet.
* Protected for tests.
*/
protected boolean isFirehoseV2DrainableByClosing(FirehoseFactoryV2 firehoseFactoryV2)
{
return false;
}

public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
Expand Down
Loading