Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
Expand Down Expand Up @@ -210,9 +211,17 @@ public boolean run()

boolean success = job.waitForCompletion(true);

Counter invalidRowCount = job.getCounters()
.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
jobStats.setInvalidRowCount(invalidRowCount.getValue());
Counters counters = job.getCounters();
if (counters == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd you need to add these? Why would the counters not be set?

Copy link
Copy Markdown
Contributor Author

@jon-wei jon-wei Feb 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happened on a task failure when I was testing the ParseExceptions, didn't look further into why the job counters weren't set

log.info("No counters found for job [%s]", job.getJobName());
} else {
Counter invalidRowCount = counters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
if (invalidRowCount != null) {
jobStats.setInvalidRowCount(invalidRowCount.getValue());
} else {
log.info("No invalid row counter found for job [%s]", job.getJobName());
}
}

return success;
}
Expand Down Expand Up @@ -258,6 +267,7 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesW

private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

@Override
protected void setup(Context context)
Expand All @@ -269,6 +279,11 @@ protected void setup(Context context)
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema()
.getDataSchema()
.getParser()
.getParseSpec()
.getDimensionsSpec());
}

@Override
Expand Down Expand Up @@ -299,9 +314,9 @@ protected void innerMap(
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions)
InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, reportParseExceptions)
:
InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions);
InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions);

context.write(
new SortableBytes(
Expand All @@ -322,6 +337,7 @@ public static class IndexGeneratorCombiner extends Reducer<BytesWritable, BytesW
private HadoopDruidIndexerConfig config;
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

@Override
protected void setup(Context context)
Expand All @@ -334,6 +350,11 @@ protected void setup(Context context)
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema()
.getDataSchema()
.getParser()
.getParseSpec()
.getDimensionsSpec());
}

@Override
Expand All @@ -350,11 +371,11 @@ protected void reduce(
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));
index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators));

while (iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);
InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators);

if (!index.canAppendRow()) {
dimOrder.addAll(index.getDimensionOrder());
Expand All @@ -381,10 +402,13 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, dimensions);

// reportParseExceptions is true as any unparseable data is already handled by the mapper.
byte[] serializedRow = InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, true);

context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true))
new BytesWritable(serializedRow)
);
}
index.close();
Expand Down Expand Up @@ -479,6 +503,7 @@ public static class IndexGeneratorReducer extends Reducer<BytesWritable, BytesWr
private List<String> metricNames = Lists.newArrayList();
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;

protected ProgressIndicator makeProgressIndicator(final Context context)
{
Expand Down Expand Up @@ -530,6 +555,11 @@ protected void setup(Context context)
metricNames.add(aggregators[i].getName());
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema()
.getDataSchema()
.getParser()
.getParseSpec()
.getDimensionsSpec());
}

@Override
Expand Down Expand Up @@ -597,7 +627,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
for (final BytesWritable bw : values) {
context.progress();

final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators));
final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(typeHelperMap, bw.getBytes(), aggregators));
int numRows = index.add(inputRow);

++lineCount;
Expand Down
203 changes: 193 additions & 10 deletions indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,24 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;

import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.ValueType;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
Expand All @@ -49,7 +56,165 @@ public class InputRowSerde
{
private static final Logger log = new Logger(InputRowSerde.class);

public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions)
private static final IndexSerdeTypeHelper STRING_HELPER = new StringIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper LONG_HELPER = new LongIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper();
private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper();

public interface IndexSerdeTypeHelper<T>
{
ValueType getType();

void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions);

T deserialize(ByteArrayDataInput in);
}

public static Map<String, IndexSerdeTypeHelper> getTypeHelperMap(DimensionsSpec dimensionsSpec)
{
Map<String, IndexSerdeTypeHelper> typeHelperMap = Maps.newHashMap();
for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) {
IndexSerdeTypeHelper typeHelper;
switch (dimensionSchema.getValueType()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace the switch with VALUE_TYPE_HELPER_ARRAY[dimensionSchema.getValueType().ordinal()] ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the switch statement but dropped the array, it wasn't being used anymore after removing the type ordinals from the serialized form

case STRING:
typeHelper = STRING_HELPER;
break;
case LONG:
typeHelper = LONG_HELPER;
break;
case FLOAT:
typeHelper = FLOAT_HELPER;
break;
case DOUBLE:
typeHelper = DOUBLE_HELPER;
break;
default:
throw new IAE("Invalid type: [%s]", dimensionSchema.getValueType());
}
typeHelperMap.put(dimensionSchema.getName(), typeHelper);
}
return typeHelperMap;
}

public static class StringIndexSerdeTypeHelper implements IndexSerdeTypeHelper<List<String>>
{
@Override
public ValueType getType()
{
return ValueType.STRING;
}

@Override
public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions)
{
List<String> values = Rows.objectToStrings(value);
try {
writeStringArray(values, out);
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public List<String> deserialize(ByteArrayDataInput in)
{
try {
return readStringArray(in);
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}

public static class LongIndexSerdeTypeHelper implements IndexSerdeTypeHelper<Long>
{
@Override
public ValueType getType()
{
return ValueType.LONG;
}

@Override
public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions)
{
Long ret = DimensionHandlerUtils.convertObjectToLong(value, reportParseExceptions);
if (ret == null) {
// remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_LONG;
}
out.writeLong(ret);
}

@Override
public Long deserialize(ByteArrayDataInput in)
{
return in.readLong();
}
}

public static class FloatIndexSerdeTypeHelper implements IndexSerdeTypeHelper<Float>
{
@Override
public ValueType getType()
{
return ValueType.FLOAT;
}

@Override
public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions)
{
Float ret = DimensionHandlerUtils.convertObjectToFloat(value, reportParseExceptions);
if (ret == null) {
// remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_FLOAT;
}
out.writeFloat(ret);
}

@Override
public Float deserialize(ByteArrayDataInput in)
{
return in.readFloat();
}
}

public static class DoubleIndexSerdeTypeHelper implements IndexSerdeTypeHelper<Double>
{
@Override
public ValueType getType()
{
return ValueType.DOUBLE;
}

@Override
public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions)
{
Double ret = DimensionHandlerUtils.convertObjectToDouble(value, reportParseExceptions);
if (ret == null) {
// remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged
// we'll also need to change the serialized encoding so that it can represent numeric nulls
ret = DimensionHandlerUtils.ZERO_DOUBLE;
}
out.writeDouble(ret);
}

@Override
public Double deserialize(ByteArrayDataInput in)
{
return in.readDouble();
}
}

public static final byte[] toBytes(
final Map<String, IndexSerdeTypeHelper> typeHelperMap,
final InputRow row,
AggregatorFactory[] aggs,
boolean reportParseExceptions
)
{
try {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
Expand All @@ -63,9 +228,12 @@ public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs,
WritableUtils.writeVInt(out, dimList.size());
if (dimList != null) {
for (String dim : dimList) {
List<String> dimValues = row.getDimension(dim);
IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dim);
if (typeHelper == null) {
typeHelper = STRING_HELPER;
}
writeString(dim, out);
writeStringArray(dimValues, out);
typeHelper.serialize(out, row.getRaw(dim), reportParseExceptions);
}
}

Expand Down Expand Up @@ -176,10 +344,14 @@ private static List<String> readStringArray(DataInput in) throws IOException
return values;
}

public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs)
public static final InputRow fromBytes(
final Map<String, IndexSerdeTypeHelper> typeHelperMap,
byte[] data,
AggregatorFactory[] aggs
)
{
try {
DataInput in = ByteStreams.newDataInput(data);
ByteArrayDataInput in = ByteStreams.newDataInput(data);

//Read timestamp
long timestamp = in.readLong();
Expand All @@ -192,14 +364,25 @@ public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs)
for (int i = 0; i < dimNum; i++) {
String dimension = readString(in);
dimensions.add(dimension);
List<String> dimensionValues = readStringArray(in);
if (dimensionValues == null) {

IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dimension);
if (typeHelper == null) {
typeHelper = STRING_HELPER;
}
Object dimValues = typeHelper.deserialize(in);
if (dimValues == null) {
continue;
}
if (dimensionValues.size() == 1) {
event.put(dimension, dimensionValues.get(0));

if (typeHelper.getType() == ValueType.STRING) {
List<String> dimensionValues = (List<String>) dimValues;
if (dimensionValues.size() == 1) {
event.put(dimension, dimensionValues.get(0));
} else {
event.put(dimension, dimensionValues);
}
} else {
event.put(dimension, dimensionValues);
event.put(dimension, dimValues);
}
}

Expand Down
Loading