Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -340,7 +340,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -378,7 +378,7 @@ public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig conf
}

public void emitDimValueCounts(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
Expand Down Expand Up @@ -891,7 +891,7 @@ public static DimValueCount fromText(Text text)
}

private static void write(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
final byte[] groupKey,
DimValueCount dimValueCount
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@

import com.metamx.common.RE;

public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
private static final Logger log = new Logger(HadoopDruidIndexerMapper.class);

private HadoopDruidIndexerConfig config;
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;

Expand All @@ -70,7 +70,7 @@ public InputRowParser getParser()

@Override
protected void map(
Writable key, Writable value, Context context
Object key, Object value, Context context
) throws IOException, InterruptedException
{
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ protected void map(
}
}

public final static InputRow parseInputRow(Writable value, InputRowParser parser)
public final static InputRow parseInputRow(Object value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
Expand All @@ -109,7 +109,7 @@ public final static InputRow parseInputRow(Writable value, InputRowParser parser
}
}

abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context)
throws IOException, InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false;

public static HadoopTuningConfig makeDefaultTuningConfig()
{
Expand All @@ -61,7 +62,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
false,
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
);
}

Expand All @@ -81,6 +83,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner;

@JsonCreator
public HadoopTuningConfig(
Expand All @@ -99,7 +102,8 @@ public HadoopTuningConfig(
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner
)
{
this.workingPath = workingPath;
Expand All @@ -120,6 +124,7 @@ public HadoopTuningConfig(
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
}

@JsonProperty
Expand Down Expand Up @@ -216,6 +221,12 @@ public float getAggregationBufferRatio()
return aggregationBufferRatio;
}

@JsonProperty
public boolean getUseCombiner()
{
return useCombiner;
}

public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
Expand All @@ -234,7 +245,8 @@ public HadoopTuningConfig withWorkingPath(String path)
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}

Expand All @@ -256,7 +268,8 @@ public HadoopTuningConfig withVersion(String ver)
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}

Expand All @@ -278,7 +291,8 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
}
Loading