Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
Expand Down
1 change: 1 addition & 0 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no|

#### IndexSpec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
Expand All @@ -92,11 +92,11 @@
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -434,13 +434,20 @@ public void run()
if (!ioConfig.getMinimumMessageTime().isPresent() ||
!ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) {

final SegmentIdentifier identifier = driver.add(
final String sequenceName = sequenceNames.get(record.partition());
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceNames.get(record.partition()),
sequenceName,
committerSupplier
);

if (identifier == null) {
if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of AppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
driver.moveSegmentOut(sequenceName, ImmutableList.of(addResult.getSegmentIdentifier()));
}
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
Expand Down Expand Up @@ -491,48 +498,59 @@ public void run()
status = Status.PUBLISHING;
}

final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);
final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);

// Sanity check, we should only be publishing things that match our desired end state.
if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}
// Sanity check, we should only be publishing things that match our desired end state.
if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}

final SegmentTransactionalInsertAction action;
final SegmentTransactionalInsertAction action;

if (ioConfig.isUseTransaction()) {
action = new SegmentTransactionalInsertAction(
segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}
if (ioConfig.isUseTransaction()) {
action = new SegmentTransactionalInsertAction(
segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}

log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());

return toolbox.getTaskActionClient().submit(action).isSuccess();
}
return toolbox.getTaskActionClient().submit(action).isSuccess();
};

final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get());
if (published == null) {
// Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
// for hand off. See KafkaSupervisorIOConfig.completionTimeout.
final SegmentsAndMetadata published = driver.publish(
publisher,
committerSupplier.get(),
sequenceNames.values()
).get();

final SegmentsAndMetadata handedOff;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = driver.registerHandoff(published)
.get();
} else {
handedOff = driver.registerHandoff(published)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}

if (handedOff == null) {
throw new ISE("Transaction failure publishing segments, aborting");
} else {
log.info(
"Published segments[%s] with metadata[%s].",
Joiner.on(", ").join(
Iterables.transform(
published.getSegments(),
handedOff.getSegments(),
new Function<DataSegment, String>()
{
@Override
Expand All @@ -543,7 +561,7 @@ public String apply(DataSegment input)
}
)
),
published.getCommitMetadata()
handedOff.getCommitMetadata()
);
}
}
Expand Down Expand Up @@ -865,8 +883,6 @@ private FiniteAppenderatorDriver newDriver(
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getHandoffConditionTimeout(),
metrics
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final IndexSpec indexSpec;
private final boolean buildV9Directly;
private final boolean reportParseExceptions;
@Deprecated
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;

Expand Down Expand Up @@ -152,6 +153,7 @@ public boolean isReportParseExceptions()
return reportParseExceptions;
}

@Deprecated
@JsonProperty
public long getHandoffConditionTimeout()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
Expand All @@ -62,6 +62,8 @@ public KafkaSupervisorTuningConfig(
indexSpec,
buildV9Directly,
reportParseExceptions,
// Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
// handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,18 @@ public void testSerdeWithNonDefaults() throws Exception
@Test
public void testCopyOf() throws Exception
{
KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L, null);
KafkaTuningConfig original = new KafkaTuningConfig(
1,
2,
new Period("PT3S"),
new File("/tmp/xxx"),
4,
new IndexSpec(),
true,
true,
5L,
null
);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);

Assert.assertEquals(1, copy.getMaxRowsInMemory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
Expand Down Expand Up @@ -74,6 +75,7 @@
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
Expand All @@ -100,6 +102,9 @@
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class IndexTask extends AbstractTask
{
Expand Down Expand Up @@ -451,9 +456,9 @@ public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, Strin
sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing);
}

final SegmentIdentifier identifier = driver.add(inputRow, sequenceName, committerSupplier);
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);

if (identifier == null) {
if (!addResult.isOk()) {
throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp());
}

Expand Down Expand Up @@ -482,7 +487,22 @@ public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata)
}
};

final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get());
final SegmentsAndMetadata published;
final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout();
if (publishTimeout == 0) {
published = driver.publish(
Copy link
Copy Markdown
Member

@pjain1 pjain1 May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as per the current KafkaIndexTask implementation the task should just block until publish succeeds

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I removed.

publisher,
committerSupplier.get(),
sequenceNameToShardSpecMap.keySet()
).get();
} else {
published = driver.publish(
publisher,
committerSupplier.get(),
sequenceNameToShardSpecMap.keySet()
).get(publishTimeout, TimeUnit.MILLISECONDS);
}

if (published == null) {
log.error("Failed to publish segments, aborting!");
return false;
Expand All @@ -505,6 +525,9 @@ public String apply(DataSegment input)
return true;
}
}
catch (TimeoutException | ExecutionException e) {
throw Throwables.propagate(e);
}
}

private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema)
Expand Down Expand Up @@ -533,8 +556,6 @@ private FiniteAppenderatorDriver newDriver(
new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec
0,
metrics
);
}
Expand All @@ -558,7 +579,7 @@ public IndexIngestionSpec(
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null
?
new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null)
new IndexTuningConfig(null, null, null, null, null, null, null, null, null, (File) null)
: tuningConfig;
}

Expand Down Expand Up @@ -624,6 +645,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi
private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true;
private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUBLISH_TIMEOUT = 0;

static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;

Expand All @@ -636,6 +658,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi
private final boolean buildV9Directly;
private final boolean forceExtendableShardSpecs;
private final boolean reportParseExceptions;
private final long publishTimeout;

@JsonCreator
public IndexTuningConfig(
Expand All @@ -647,7 +670,8 @@ public IndexTuningConfig(
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("publishTimeout") @Nullable Long publishTimeout
)
{
this(
Expand All @@ -659,6 +683,7 @@ public IndexTuningConfig(
buildV9Directly,
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
null
);
}
Expand All @@ -672,6 +697,7 @@ private IndexTuningConfig(
@Nullable Boolean buildV9Directly,
@Nullable Boolean forceExtendableShardSpecs,
@Nullable Boolean reportParseExceptions,
@Nullable Long publishTimeout,
@Nullable File basePersistDirectory
)
{
Expand All @@ -696,6 +722,7 @@ private IndexTuningConfig(
this.reportParseExceptions = reportParseExceptions == null
? DEFAULT_REPORT_PARSE_EXCEPTIONS
: reportParseExceptions;
this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout;
this.basePersistDirectory = basePersistDirectory;
}

Expand All @@ -710,6 +737,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir)
buildV9Directly,
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
dir
);
}
Expand Down Expand Up @@ -772,6 +800,12 @@ public boolean isForceExtendableShardSpecs()
return forceExtendableShardSpecs;
}

@JsonProperty
public long getPublishTimeout()
{
return publishTimeout;
}

@Override
public Period getIntermediatePersistPeriod()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ private IndexTask.IndexIngestionSpec createIngestionSpec(
null,
true,
forceExtendableShardSpecs,
true
true,
null
)
);
}
Expand Down
Loading