Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ default boolean isSplittable()
{
return false;
}

/**
* Some FirehoseFactory implementations require the indexing service's TaskToolbox to function.
* Use this method to provide it to them. The argument should always be a TaskToolbox; the
* method signature uses Object so that FirehoseFactories don't all have to be inside the
* indexing-service module.
*/
default void setTaskToolbox(Object taskToolbox)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not sure how we can make this better without huge refactoring.. Let's open an issue about it after this PR.

{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
Expand Down Expand Up @@ -84,7 +83,6 @@
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
Expand Down Expand Up @@ -419,7 +417,7 @@ public TaskStatus run(final TaskToolbox toolbox)

final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();

setFirehoseFactoryToolbox(firehoseFactory, toolbox);
firehoseFactory.setTaskToolbox(toolbox);

final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
Expand Down Expand Up @@ -489,25 +487,6 @@ public TaskStatus run(final TaskToolbox toolbox)
}
}

// pass toolbox to any IngestSegmentFirehoseFactory
private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
{
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
return;
}

if (firehoseFactory instanceof CombiningFirehoseFactory) {
for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
} else if (delegateFactory instanceof CombiningFirehoseFactory) {
setFirehoseFactoryToolbox(delegateFactory, toolbox);
}
}
}
}

private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -184,11 +183,7 @@ public String getSupervisorTaskId()
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();

if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
// pass toolbox to Firehose
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
}
firehoseFactory.setTaskToolbox(toolbox);

final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ public List<String> getMetrics()
return metrics;
}

public void setTaskToolbox(TaskToolbox taskToolbox)
@Override
public void setTaskToolbox(Object taskToolbox)
{
this.taskToolbox = taskToolbox;
this.taskToolbox = (TaskToolbox) taskToolbox;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Module;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
Expand Down Expand Up @@ -81,7 +83,7 @@
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -342,27 +344,33 @@ public DataSegment restore(DataSegment segment)
null,
ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME)
)) {
final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
TASK.getDataSource(),
Intervals.ETERNITY,
new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
dim_names,
metric_names,
INDEX_IO
);
factory.setTaskToolbox(taskToolboxFactory.build(TASK));
values.add(
new Object[]{
StringUtils.format(
"DimNames[%s]MetricNames[%s]ParserDimNames[%s]",
dim_names == null ? "null" : "dims",
metric_names == null ? "null" : "metrics",
parser == ROW_PARSER ? "dims" : "null"
),
factory,
parser
}
);
for (Boolean wrapInCombining : Arrays.asList(false, true)) {
final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory(
TASK.getDataSource(),
Intervals.ETERNITY,
new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
dim_names,
metric_names,
INDEX_IO
);
final FirehoseFactory factory = wrapInCombining
? new CombiningFirehoseFactory(ImmutableList.of(isfFactory))
: isfFactory;
factory.setTaskToolbox(taskToolboxFactory.build(TASK));
values.add(
new Object[]{
StringUtils.format(
"DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]",
dim_names == null ? "null" : "dims",
metric_names == null ? "null" : "metrics",
parser == ROW_PARSER ? "dims" : "null",
wrapInCombining
),
factory,
parser
}
);
}
}
}
}
Expand Down Expand Up @@ -407,7 +415,7 @@ public void configure(Binder binder)

public IngestSegmentFirehoseFactoryTest(
String testName,
IngestSegmentFirehoseFactory factory,
FirehoseFactory factory,
InputRowParser rowParser
)
{
Expand Down Expand Up @@ -436,7 +444,7 @@ public IngestSegmentFirehoseFactoryTest(
private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile();
private static final List<DataSegment> segmentSet = new ArrayList<>(MAX_SHARD_NUMBER);

private final IngestSegmentFirehoseFactory factory;
private final FirehoseFactory<InputRowParser> factory;
private final InputRowParser rowParser;

private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
Expand Down Expand Up @@ -518,15 +526,20 @@ private static void recursivelyDelete(final File dir)
@Test
public void sanityTest()
{
Assert.assertEquals(TASK.getDataSource(), factory.getDataSource());
if (factory.getDimensions() != null) {
Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray());
if (factory instanceof CombiningFirehoseFactory) {
// This method tests IngestSegmentFirehoseFactory-specific methods.
return;
}
final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory;
Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource());
if (isfFactory.getDimensions() != null) {
Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray());
}
Assert.assertEquals(Intervals.ETERNITY, factory.getInterval());
if (factory.getMetrics() != null) {
Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval());
if (isfFactory.getMetrics() != null) {
Assert.assertEquals(
ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME),
ImmutableSet.copyOf(factory.getMetrics())
ImmutableSet.copyOf(isfFactory.getMetrics())
);
}
}
Expand All @@ -536,15 +549,17 @@ public void simpleFirehoseReadingTest() throws IOException
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
try (final IngestSegmentFirehose firehose =
(IngestSegmentFirehose)
factory.connect(rowParser, null)) {
try (final Firehose firehose = factory.connect(rowParser, null)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray());
Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME));
Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001);
Assert.assertEquals(
METRIC_FLOAT_VALUE,
row.getMetric(METRIC_FLOAT_NAME).floatValue(),
METRIC_FLOAT_VALUE * 0.0001
);
++rowcount;
}
}
Expand All @@ -563,9 +578,8 @@ public void testTransformSpec() throws IOException
)
);
int skipped = 0;
try (final IngestSegmentFirehose firehose =
(IngestSegmentFirehose)
factory.connect(transformSpec.decorate(rowParser), null)) {
try (final Firehose firehose =
factory.connect(transformSpec.decorate(rowParser), null)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();
if (row == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.Firehose;
Expand Down Expand Up @@ -61,6 +62,13 @@ public Interval getInterval()
return interval;
}

@Override
@JsonIgnore
public void setTaskToolbox(Object taskToolbox)
{
delegate.setTaskToolbox(taskToolbox);
}

@Override
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -65,6 +66,15 @@ public List<FirehoseFactory> getDelegateFactoryList()
return delegateFactoryList;
}

@Override
@JsonIgnore
public void setTaskToolbox(Object taskToolbox)
{
for (FirehoseFactory delegateFactory : delegateFactoryList) {
delegateFactory.setTaskToolbox(taskToolbox);
}
}

class CombiningFirehose implements Firehose
{
private final InputRowParser parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.Firehose;
Expand Down Expand Up @@ -61,6 +62,13 @@ public int getCount()
return count;
}

@Override
@JsonIgnore
public void setTaskToolbox(Object taskToolbox)
{
delegate.setTaskToolbox(taskToolbox);
}

@Override
public Firehose connect(final InputRowParser parser, File temporaryDirectory) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
Expand Down Expand Up @@ -139,4 +140,11 @@ public DateTime getShutoffTime()
{
return shutoffTime;
}

@Override
@JsonIgnore
public void setTaskToolbox(Object taskToolbox)
{
delegateFactory.setTaskToolbox(taskToolbox);
}
}