Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.plumber.SinkFactory;
import org.joda.time.Period;

import java.io.File;
Expand All @@ -44,6 +46,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@Deprecated
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;
private final SinkFactory sinkFactory;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract an umbrella abstraction like "SegmentStrategy" with createSinkFactory() and createIndexMerger() methods, and inject/serialize/deserialize only it, here and in other places.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

private final IndexMerger customIndexMerger;

@JsonCreator
public KafkaTuningConfig(
Expand All @@ -57,7 +61,9 @@ public KafkaTuningConfig(
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("sinkFactory") SinkFactory sinkFactory,
@JsonProperty("customIndexMerger") IndexMerger customIndexMerger
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
Expand All @@ -80,6 +86,8 @@ public KafkaTuningConfig(
this.resetOffsetAutomatically = resetOffsetAutomatically == null
? DEFAULT_RESET_OFFSET_AUTOMATICALLY
: resetOffsetAutomatically;
this.sinkFactory = sinkFactory == null ? defaults.getSinkFactory() : sinkFactory;
this.customIndexMerger = customIndexMerger;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle null here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller will handle the null customIndexMerger. like in IndexTask:

        ingestionSchema.getTuningConfig().getCustomIndexMerger() != null
        ? ingestionSchema.getTuningConfig().getCustomIndexMerger() : toolbox.getIndexMergerV9()

customIndexMerger is a customized IndexMerger which can be null if user don't need it.
If we set a default IndexMerger implement to customIndexMerger, then I think customIndexMerger should be renamed to IndexMerger which should never be null according to the literal meaning

}

public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
Expand All @@ -94,7 +102,9 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
true,
config.reportParseExceptions,
config.handoffConditionTimeout,
config.resetOffsetAutomatically
config.resetOffsetAutomatically,
config.sinkFactory,
config.customIndexMerger
);
}

Expand Down Expand Up @@ -125,6 +135,20 @@ public File getBasePersistDirectory()
return basePersistDirectory;
}

@Override
@JsonProperty
public SinkFactory getSinkFactory()
{
return sinkFactory;
}

@Override
@JsonProperty
public IndexMerger getCustomIndexMerger()
{
return customIndexMerger;
}

@Override
@JsonProperty
public int getMaxPendingPersists()
Expand Down Expand Up @@ -181,7 +205,9 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
resetOffsetAutomatically,
sinkFactory,
customIndexMerger
);
}

Expand All @@ -197,7 +223,9 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows)
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
resetOffsetAutomatically,
sinkFactory,
customIndexMerger
);
}

Expand Down Expand Up @@ -241,8 +269,15 @@ public boolean equals(Object o)
: that.basePersistDirectory != null) {
return false;
}
return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null;

if (indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to use Objects.equals()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's generated by Intellij Idea, but sure, will modify it.

Copy link
Copy Markdown
Member

@leventov leventov Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can generate the forms that I suggested with intelliJ as well, need to choose another option in generation dialog. Once you choose it, it will be the default later.

return false;
}
if (sinkFactory != null ? !sinkFactory.equals(that.sinkFactory) : that.sinkFactory != null) {
return false;
}
return customIndexMerger != null
? customIndexMerger.equals(that.customIndexMerger)
: that.customIndexMerger == null;
}

@Override
Expand All @@ -257,6 +292,8 @@ public int hashCode()
result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
result = 31 * result + (sinkFactory != null ? sinkFactory.hashCode() : 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor to use Objects.hash()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

result = 31 * result + (customIndexMerger != null ? customIndexMerger.hashCode() : 0);
return result;
}

Expand All @@ -273,6 +310,8 @@ public String toString()
", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
", sinkFactory=" + sinkFactory +
", customIndexMerger=" + customIndexMerger +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexMerger doesn't provide proper toString() method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexMerger is interface now, the implement of this interface should implement toString

'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public KafkaSupervisorSpec(
null,
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.realtime.plumber.SinkFactory;
import org.joda.time.Duration;
import org.joda.time.Period;

Expand All @@ -48,6 +50,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("sinkFactory") SinkFactory sinkFactory,
@JsonProperty("customIndexMerger") IndexMerger customIndexMerger,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
Expand All @@ -68,7 +72,9 @@ public KafkaSupervisorTuningConfig(
// Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
// handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
resetOffsetAutomatically,
sinkFactory,
customIndexMerger
);

this.workerThreads = workerThreads;
Expand Down Expand Up @@ -128,6 +134,8 @@ public String toString()
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", sinkFactory=" + getSinkFactory() +
", customIndexMerger=" + getCustomIndexMerger() +
", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,9 @@ private KafkaIndexTask createTask(
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
resetOffsetAutomatically,
null,
null
);
final KafkaIndexTask task = new KafkaIndexTask(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public void testCopyOf() throws Exception
true,
true,
5L,
null,
null,
null
);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ public void setupTest() throws Exception
false,
null,
null,
null,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
Expand Down Expand Up @@ -98,7 +99,7 @@ public Plumber findPlumber(
)
{
// There can be only one.
final Sink theSink = new Sink(
final Sink theSink = config.getSinkFactory().create(
interval,
schema,
config.getShardSpec(),
Expand All @@ -113,6 +114,10 @@ public Plumber findPlumber(
// Set of spilled segments. Will be merged at the end.
final Set<File> spilled = Sets.newHashSet();

// IndexMerger implementation.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not useful comment

final IndexMerger theIndexMerger = config.getCustomIndexMerger() != null
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding null handling - doesn't Jackson handle it up front, when you specify defaultImpl=... on the IndexMerger interface (or SegmentStrategy for that matter, if you apply suggestion above)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually the default implement is provided in the class context, like indexMergerV9 here. Also the TaskToolbox has the default implement, like toolbox.getIndexMergerV9().
So I think it's better to regard the old code here to get the default implement.

? config.getCustomIndexMerger() : indexMergerV9;

return new Plumber()
{
@Override
Expand Down Expand Up @@ -181,7 +186,7 @@ public void finishJob()
}

fileToUpload = new File(tmpSegmentDir, "merged");
indexMergerV9.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec());
}

// Map merged segment so we can extract dimensions
Expand Down Expand Up @@ -226,7 +231,7 @@ private void spillIfSwappable()
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);

try {
indexMergerV9.persist(
theIndexMerger.persist(
indexToPersist.getIndex(),
dirToPersist,
config.getIndexSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;
Expand All @@ -83,7 +84,9 @@
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.DefaultSinkFactory;
import io.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SinkFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -553,7 +556,8 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9()
ingestionSchema.getTuningConfig().getCustomIndexMerger() != null
? ingestionSchema.getTuningConfig().getCustomIndexMerger() : toolbox.getIndexMergerV9()
);
}

Expand Down Expand Up @@ -593,7 +597,7 @@ public IndexIngestionSpec(
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null
?
new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null)
new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null, null, null)
: tuningConfig;
}

Expand Down Expand Up @@ -660,6 +664,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi
private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUBLISH_TIMEOUT = 0;
private static final SinkFactory DEFAULT_SINK_FACTORY = new DefaultSinkFactory();

static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000;

Expand All @@ -672,6 +677,8 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi
private final boolean forceExtendableShardSpecs;
private final boolean reportParseExceptions;
private final long publishTimeout;
private final SinkFactory sinkFactory;
private final IndexMerger customIndexMerger;

@JsonCreator
public IndexTuningConfig(
Expand All @@ -685,7 +692,9 @@ public IndexTuningConfig(
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("publishTimeout") @Nullable Long publishTimeout
@JsonProperty("publishTimeout") @Nullable Long publishTimeout,
@JsonProperty("sinkFactory") @Nullable SinkFactory sinkFactory,
@JsonProperty("customIndexMerger") @Nullable IndexMerger customIndexMerger
)
{
this(
Expand All @@ -697,7 +706,9 @@ public IndexTuningConfig(
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
null
null,
sinkFactory,
customIndexMerger
);
}

Expand All @@ -710,7 +721,9 @@ private IndexTuningConfig(
@Nullable Boolean forceExtendableShardSpecs,
@Nullable Boolean reportParseExceptions,
@Nullable Long publishTimeout,
@Nullable File basePersistDirectory
@Nullable File basePersistDirectory,
@Nullable SinkFactory sinkFactory,
@Nullable IndexMerger customIndexMerger
)
{
Preconditions.checkArgument(
Expand All @@ -735,6 +748,8 @@ private IndexTuningConfig(
: reportParseExceptions;
this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout;
this.basePersistDirectory = basePersistDirectory;
this.sinkFactory = sinkFactory == null ? DEFAULT_SINK_FACTORY : sinkFactory;
this.customIndexMerger = customIndexMerger;
}

public IndexTuningConfig withBasePersistDirectory(File dir)
Expand All @@ -748,7 +763,9 @@ public IndexTuningConfig withBasePersistDirectory(File dir)
forceExtendableShardSpecs,
reportParseExceptions,
publishTimeout,
dir
dir,
sinkFactory,
customIndexMerger
);
}

Expand Down Expand Up @@ -784,6 +801,20 @@ public File getBasePersistDirectory()
return basePersistDirectory;
}

@JsonProperty
@Override
public SinkFactory getSinkFactory()
{
return sinkFactory;
}

@JsonProperty
@Override
public IndexMerger getCustomIndexMerger()
{
return customIndexMerger;
}

@JsonProperty
@Override
public int getMaxPendingPersists()
Expand Down
Loading