Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class InputRowSchema
private final TimestampSpec timestampSpec;
private final DimensionsSpec dimensionsSpec;
private final ColumnsFilter columnsFilter;

/**
* Set of metric names for further downstream processing by {@link InputSource}.
* Empty set if no metric given.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public class DimensionsSpec
private final Map<String, DimensionSchema> dimensionSchemaMap;
private final boolean includeAllDimensions;

public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false);
private final boolean useNestedColumnIndexerForSchemaDiscovery;

public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false, null);

public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
{
Expand Down Expand Up @@ -78,15 +80,16 @@ public static Builder builder()

public DimensionsSpec(List<DimensionSchema> dimensions)
{
this(dimensions, null, null, false);
this(dimensions, null, null, false, null);
}

@JsonCreator
private DimensionsSpec(
@JsonProperty("dimensions") List<DimensionSchema> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@Deprecated @JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("includeAllDimensions") boolean includeAllDimensions
@JsonProperty("includeAllDimensions") boolean includeAllDimensions,
@JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean useNestedColumnIndexerForSchemaDiscovery
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a shorter name? "discoverNested"?

)
{
this.dimensions = dimensions == null
Expand Down Expand Up @@ -115,6 +118,8 @@ private DimensionsSpec(
dimensionSchemaMap.put(newSchema.getName(), newSchema);
}
this.includeAllDimensions = includeAllDimensions;
this.useNestedColumnIndexerForSchemaDiscovery =
useNestedColumnIndexerForSchemaDiscovery != null && useNestedColumnIndexerForSchemaDiscovery;
}

@JsonProperty
Expand All @@ -135,6 +140,12 @@ public boolean isIncludeAllDimensions()
return includeAllDimensions;
}

@JsonProperty
public boolean useNestedColumnIndexerForSchemaDiscovery()
{
return useNestedColumnIndexerForSchemaDiscovery;
}

@Deprecated
@JsonIgnore
public List<SpatialDimensionSchema> getSpatialDimensions()
Expand Down Expand Up @@ -188,7 +199,13 @@ public boolean hasCustomDimensions()
@PublicApi
public DimensionsSpec withDimensions(List<DimensionSchema> dims)
{
return new DimensionsSpec(dims, ImmutableList.copyOf(dimensionExclusions), null, includeAllDimensions);
return new DimensionsSpec(
dims,
ImmutableList.copyOf(dimensionExclusions),
null,
includeAllDimensions,
useNestedColumnIndexerForSchemaDiscovery
);
}

public DimensionsSpec withDimensionExclusions(Set<String> dimExs)
Expand All @@ -197,14 +214,21 @@ public DimensionsSpec withDimensionExclusions(Set<String> dimExs)
dimensions,
ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)),
null,
includeAllDimensions
includeAllDimensions,
useNestedColumnIndexerForSchemaDiscovery
);
}

@Deprecated
public DimensionsSpec withSpatialDimensions(List<SpatialDimensionSchema> spatials)
{
return new DimensionsSpec(dimensions, ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions);
return new DimensionsSpec(
dimensions,
ImmutableList.copyOf(dimensionExclusions),
spatials,
includeAllDimensions,
useNestedColumnIndexerForSchemaDiscovery
);
}

private void verify(List<SpatialDimensionSchema> spatialDimensions)
Expand Down Expand Up @@ -243,14 +267,20 @@ public boolean equals(Object o)
}
DimensionsSpec that = (DimensionsSpec) o;
return includeAllDimensions == that.includeAllDimensions
&& useNestedColumnIndexerForSchemaDiscovery == that.useNestedColumnIndexerForSchemaDiscovery
&& Objects.equals(dimensions, that.dimensions)
&& Objects.equals(dimensionExclusions, that.dimensionExclusions);
}

@Override
public int hashCode()
{
return Objects.hash(dimensions, dimensionExclusions, includeAllDimensions);
return Objects.hash(
dimensions,
dimensionExclusions,
includeAllDimensions,
useNestedColumnIndexerForSchemaDiscovery
);
}

@Override
Expand All @@ -260,6 +290,7 @@ public String toString()
"dimensions=" + dimensions +
", dimensionExclusions=" + dimensionExclusions +
", includeAllDimensions=" + includeAllDimensions +
", useNestedColumnIndexerForSchemaDiscovery=" + useNestedColumnIndexerForSchemaDiscovery +
'}';
}

Expand All @@ -270,6 +301,8 @@ public static final class Builder
private List<SpatialDimensionSchema> spatialDimensions;
private boolean includeAllDimensions;

private boolean useNestedColumnIndexerForSchemaDiscovery;

public Builder setDimensions(List<DimensionSchema> dimensions)
{
this.dimensions = dimensions;
Expand Down Expand Up @@ -301,9 +334,21 @@ public Builder setIncludeAllDimensions(boolean includeAllDimensions)
return this;
}

public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean useNestedColumnIndexerForSchemaDiscovery)
{
this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery;
return this;
}

public DimensionsSpec build()
{
return new DimensionsSpec(dimensions, dimensionExclusions, spatialDimensions, includeAllDimensions);
return new DimensionsSpec(
dimensions,
dimensionExclusions,
spatialDimensions,
includeAllDimensions,
useNestedColumnIndexerForSchemaDiscovery
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ public class JsonLineReader extends TextReader
)
{
super(inputRowSchema, source);
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(
keepNullColumns,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
)
);
this.mapper = mapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(
keepNullColumns,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
)
);
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ public class JsonReader extends IntermediateRowParsingReader<String>
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(
keepNullColumns,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
)
);
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,23 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
private final boolean keepNullValues;

private final boolean discoverNestedFields;

public JSONFlattenerMaker(boolean keepNullValues)

public JSONFlattenerMaker(boolean keepNullValues, boolean discoverNestedFields)
{
this.keepNullValues = keepNullValues;
this.discoverNestedFields = discoverNestedFields;
}

@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
// if discovering nested fields, just return all root fields since we want everything
// else, we filter for literals and arrays of literals
if (discoverNestedFields) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you drop a one line comment here? I am assuming its like this since each top-level field is a field of its own if we are allowing nested columns.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want similar comments for all the other FlattenerMaker implementations?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went ahead and added comments to all

return obj::fieldNames;
}
return FluentIterable.from(obj::fields)
.filter(
entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class JSONPathParser implements Parser<String, Object>
public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean keepNullColumns)
{
this.mapper = mapper == null ? new ObjectMapper() : mapper;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.flattener = ObjectFlatteners.create(
flattenSpec,
new JSONFlattenerMaker(keepNullColumns, false)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ public interface FlattenerMaker<T>
{
JsonProvider getJsonProvider();
/**
* List all "root" primitive properties and primitive lists (no nested objects, no lists of objects)
* List all "root" fields. If
* {@link org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery} is false, this
* method should filter fields to include only fields that contain primitive and lists of primitive values
*/
Iterable<String> discoverRootFields(T obj);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -37,7 +38,8 @@ public class JSONFlattenerMakerTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true);
private static final JSONFlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false);
private static final JSONFlattenerMaker FLATTENER_MAKER_NESTED = new JSONFlattenerMaker(true, true);

@Test
public void testStrings() throws JsonProcessingException
Expand Down Expand Up @@ -169,4 +171,32 @@ public void testNested() throws JsonProcessingException
result = FLATTENER_MAKER.finalizeConversionForMap(node);
Assert.assertEquals(expectedList, result);
}

@Test
public void testDiscovery() throws JsonProcessingException
{
Map<String, Object> theMap =
ImmutableMap.<String, Object>builder()
.put("bool", true)
.put("int", 1)
.put("long", 1L)
.put("float", 0.11f)
.put("double", 0.33)
.put("binary", new byte[]{0x01, 0x02, 0x03})
.put("list", ImmutableList.of("foo", "bar", "baz"))
.put("anotherList", ImmutableList.of(1, 2, 3))
.put("nested", ImmutableMap.of("x", 1L, "y", 2L, "z", 3L))
.build();

JsonNode node = OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(theMap));
Assert.assertTrue(node.isObject());
Assert.assertEquals(
ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList"),
ImmutableSet.copyOf(FLATTENER_MAKER.discoverRootFields(node))
);
Assert.assertEquals(
ImmutableSet.of("bool", "int", "long", "float", "double", "binary", "list", "anotherList", "nested"),
ImmutableSet.copyOf(FLATTENER_MAKER_NESTED.discoverRootFields(node))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ObjectFlattenersTest
{
private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}";

private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true);
private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true, false);
private static final ObjectFlattener FLATTENER = ObjectFlatteners.create(
new JSONPathSpec(
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,27 @@ private static boolean isFieldPrimitive(Schema.Field field)
private final boolean fromPigAvroStorage;
private final boolean binaryAsString;

private final boolean discoverNestedFields;

/**
* @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
* @param binaryAsString boolean to encode the byte[] as a string.
* @param fromPigAvroStorage boolean to specify the data file is stored using AvroStorage
* @param binaryAsString if true, treat byte[] as utf8 encoded values and coerce to strings, else leave as byte[]
* @param extractUnionsByType if true, unions will be extracted to separate nested fields for each type. See
* {@link GenericAvroJsonProvider#extractUnionTypes(Object)} for more details
* @param discoverNestedFields if true, {@link #discoverRootFields(GenericRecord)} will return the full set of
* fields, else this list will be filtered to contain only simple literals and arrays
* of simple literals
*/
public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binaryAsString, final boolean extractUnionsByType)
public AvroFlattenerMaker(
final boolean fromPigAvroStorage,
final boolean binaryAsString,
final boolean extractUnionsByType,
final boolean discoverNestedFields
)
{
this.fromPigAvroStorage = fromPigAvroStorage;
this.binaryAsString = binaryAsString;
this.discoverNestedFields = discoverNestedFields;

this.avroJsonProvider = new GenericAvroJsonProvider(extractUnionsByType);
this.jsonPathConfiguration =
Expand All @@ -113,6 +126,11 @@ public AvroFlattenerMaker(final boolean fromPigAvroStorage, final boolean binary
@Override
public Set<String> discoverRootFields(final GenericRecord obj)
{
// if discovering nested fields, just return all root fields since we want everything
// else, we filter for literals and arrays of literals
if (discoverNestedFields) {
return obj.getSchema().getFields().stream().map(Schema.Field::name).collect(Collectors.toSet());
}
return obj.getSchema()
.getFields()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.readerSchema = readerSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString, extractUnionsByType));
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
new AvroFlattenerMaker(
false,
binaryAsString,
extractUnionsByType,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
)
);
}

@Override
Expand Down
Loading