Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,12 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>io.tesla.aether</groupId>
Expand Down
4 changes: 4 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.MapInputRowParser;
import com.metamx.common.parsers.ParseException;
import com.metamx.common.parsers.Parser;
import io.druid.data.input.impl.ParseSpec;

import java.io.InputStream;
Expand All @@ -46,7 +46,7 @@ public class ProtoBufInputRowParser implements ByteBufferInputRowParser
private static final Logger log = new Logger(ProtoBufInputRowParser.class);

private final ParseSpec parseSpec;
private final MapInputRowParser mapParser;
private Parser<String, Object> parser;
private final String descriptorFileInClasspath;

@JsonCreator
Expand All @@ -57,7 +57,7 @@ public ProtoBufInputRowParser(
{
this.parseSpec = parseSpec;
this.descriptorFileInClasspath = descriptorFileInClasspath;
this.mapParser = new MapInputRowParser(this.parseSpec);
this.parser = parseSpec.makeParser();
}

@Override
Expand All @@ -74,44 +74,28 @@ public ProtoBufInputRowParser withParseSpec(ParseSpec parseSpec)

@Override
public InputRow parse(ByteBuffer input)
{
// We should really create a ProtoBufBasedInputRow that does not need an intermediate map but accesses
// the DynamicMessage directly...
Map<String, Object> theMap = buildStringKeyMap(input);

return mapParser.parse(theMap);
}

private Map<String, Object> buildStringKeyMap(ByteBuffer input)
{
final Descriptor descriptor = getDescriptor(descriptorFileInClasspath);
final Map<String, Object> theMap = Maps.newHashMap();

DynamicMessage message;
try {
DynamicMessage message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
Map<Descriptors.FieldDescriptor, Object> allFields = message.getAllFields();

for (Map.Entry<Descriptors.FieldDescriptor, Object> entry : allFields.entrySet()) {
String name = entry.getKey().getName();
if (theMap.containsKey(name)) {
continue;
// Perhaps throw an exception here?
// throw new RuntimeException("dupicate key " + name + " in " + message);
}
Object value = entry.getValue();
if (value instanceof Descriptors.EnumValueDescriptor) {
Descriptors.EnumValueDescriptor desc = (Descriptors.EnumValueDescriptor) value;
value = desc.getName();
}

theMap.put(name, value);
}

message = DynamicMessage.parseFrom(descriptor, ByteString.copyFrom(input));
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(e, "Invalid protobuf exception");
}
String json = null;
try {
json = JsonFormat.printer().print(message);
}
catch (InvalidProtocolBufferException e) {
log.warn(e, "Problem with protobuf something");
e.printStackTrace();
}
return theMap;
Map<String, Object> record = parser.parse(json);
return new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(),
record
);
}

private Descriptor getDescriptor(String descriptorFileInClassPath)
Expand All @@ -121,7 +105,7 @@ private Descriptor getDescriptor(String descriptorFileInClassPath)
FileDescriptorSet set = FileDescriptorSet.parseFrom(fin);
FileDescriptor file = FileDescriptor.buildFrom(
set.getFile(0), new FileDescriptor[]
{}
{}
);
return file.getMessageTypes().get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,86 +19,96 @@

package io.druid.data.input;

import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.JSONPathFieldSpec;
import io.druid.data.input.impl.JSONPathFieldType;
import io.druid.data.input.impl.JSONPathSpec;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.TimestampSpec;
import org.joda.time.DateTime;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class ProtoBufInputRowParserTest
{

public static final String[] DIMENSIONS = new String[]{"eventType", "id", "someOtherId", "isValid"};

/*
eventType = 1;

required uint64 id = 2;
required string timestamp = 3;
optional uint32 someOtherId = 4;
optional bool isValid = 5;
optional string description = 6;

optional float someFloatColumn = 7;
optional uint32 someIntColumn = 8;
optional uint64 someLongColumn = 9;
*/

@Test
public void testParse() throws Exception
{

//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), Arrays.<String>asList(), null)
),
"prototest.desc"
ParseSpec parseSpec = new JSONParseSpec(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(Lists.<DimensionSchema>newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid")
), null, null),
new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
)
), null
);

//configure parser with desc file
ProtoBufInputRowParser parser = new ProtoBufInputRowParser(parseSpec, "prototest.desc");

//create binary of proto test event
DateTime dateTime = new DateTime(2012, 07, 12, 9, 30);
ProtoTestEventWrapper.ProtoTestEvent event = ProtoTestEventWrapper.ProtoTestEvent.newBuilder()
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.build();
.setDescription("description")
.setEventType(ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE)
.setId(4711L)
.setIsValid(true)
.setSomeOtherId(4712)
.setTimestamp(dateTime.toString())
.setSomeFloatColumn(47.11F)
.setSomeIntColumn(815)
.setSomeLongColumn(816L)
.setFoo(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("baz"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar0"))
.addBar(ProtoTestEventWrapper.ProtoTestEvent.Foo
.newBuilder()
.setBar("bar1"))
.build();

ByteArrayOutputStream out = new ByteArrayOutputStream();
event.writeTo(out);

InputRow row = parser.parse(ByteBuffer.wrap(out.toByteArray()));
System.out.println(row);

assertEquals(Arrays.asList(DIMENSIONS), row.getDimensions());
assertEquals(dateTime.getMillis(), row.getTimestampFromEpoch());

assertDimensionEquals(row, "id", "4711");
assertDimensionEquals(row, "isValid", "true");
assertDimensionEquals(row, "someOtherId", "4712");
assertDimensionEquals(row, "description", "description");

assertDimensionEquals(row, "eventType", ProtoTestEventWrapper.ProtoTestEvent.EventCategory.CATEGORY_ONE.name());
assertDimensionEquals(row, "foobar", "baz");
assertDimensionEquals(row, "bar0", "bar0");


assertEquals(47.11F, row.getFloatMetric("someFloatColumn"), 0.0);
assertEquals(815.0F, row.getFloatMetric("someIntColumn"), 0.0);
assertEquals(816.0F, row.getFloatMetric("someLongColumn"), 0.0);

}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
Expand Down
Loading