Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DimensionsSpec
{
private final List<DimensionSchema> dimensions;
private final Set<String> dimensionExclusions;
private final Boolean includeTruncatedTimestampColumnAsDimension;
private final Map<String, DimensionSchema> dimensionSchemaMap;

public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
Expand All @@ -65,6 +66,7 @@ public static DimensionSchema convertSpatialSchema(SpatialDimensionSchema spatia
public DimensionsSpec(
@JsonProperty("dimensions") List<DimensionSchema> dimensions,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions,
@JsonProperty("includeTruncatedTimestampColumnAsDimension") Boolean includeTruncatedTimestampColumnAsDimension,
@Deprecated @JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
Expand All @@ -76,6 +78,10 @@ public DimensionsSpec(
? Sets.<String>newHashSet()
: Sets.newHashSet(dimensionExclusions);

this.includeTruncatedTimestampColumnAsDimension = includeTruncatedTimestampColumnAsDimension == null
? false
: includeTruncatedTimestampColumnAsDimension;

List<SpatialDimensionSchema> spatialDims = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
Expand All @@ -88,13 +94,21 @@ public DimensionsSpec(
dimensionSchemaMap.put(schema.getName(), schema);
}

for(SpatialDimensionSchema spatialSchema : spatialDims) {
for (SpatialDimensionSchema spatialSchema : spatialDims) {
DimensionSchema newSchema = DimensionsSpec.convertSpatialSchema(spatialSchema);
this.dimensions.add(newSchema);
dimensionSchemaMap.put(newSchema.getName(), newSchema);
}
}

public DimensionsSpec(
List<DimensionSchema> dimensions,
List<String> dimensionExclusions,
List<SpatialDimensionSchema> spatialDimensions
)
{
this(dimensions, dimensionExclusions, false, spatialDimensions);
}

@JsonProperty
public List<DimensionSchema> getDimensions()
Expand All @@ -108,6 +122,12 @@ public Set<String> getDimensionExclusions()
return dimensionExclusions;
}

@JsonProperty
public Boolean isIncludeTruncatedTimestampColumnAsDimension()
{
return includeTruncatedTimestampColumnAsDimension;
}

@Deprecated @JsonIgnore
public List<SpatialDimensionSchema> getSpatialDimensions()
{
Expand Down Expand Up @@ -220,15 +240,18 @@ public boolean equals(Object o)
if (!dimensions.equals(that.dimensions)) {
return false;
}

return dimensionExclusions.equals(that.dimensionExclusions);
if (!dimensionExclusions.equals(that.dimensionExclusions)) {
return false;
}
return includeTruncatedTimestampColumnAsDimension.equals(that.includeTruncatedTimestampColumnAsDimension);
}

@Override
public int hashCode()
{
int result = dimensions.hashCode();
result = 31 * result + dimensionExclusions.hashCode();
result = 31 * result + includeTruncatedTimestampColumnAsDimension.hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.BaseProgressIndicator;
Expand Down Expand Up @@ -220,13 +221,15 @@ private static IncrementalIndex makeIncrementalIndex(
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
InputRowParser parser = config.getSchema().getDataSchema().getParser();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withTimestampSpec(config.getSchema().getDataSchema().getParser().getParseSpec().getTimestampSpec())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withTimestampSpec(parser)
.withDimensionsSpec(parser)
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.withIncludeTruncatedTimestampColumnAsDimension(parser)
.build();

OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ public IncrementalIndex(
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}

if (incrementalIndexSchema.isIncludeTruncatedTimestampColumnAsDimension()
&& metadata.getTimestampSpec() != null
&& columnCapabilities.containsKey(metadata.getTimestampSpec().getTimestampColumn())) {
this.rowTransformers.add(
new TruncateTimestampColumnRowTransformer(gran, metadata)
);
}
}

private DimDim newDimDim(String dimension, ValueType type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ public class IncrementalIndexSchema
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] metrics;
private final boolean rollup;
private final boolean includeTruncatedTimestampColumnAsDimension;

public IncrementalIndexSchema(
long minTimestamp,
TimestampSpec timestampSpec,
QueryGranularity gran,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] metrics,
boolean rollup
boolean rollup,
boolean includeTruncatedTimestampColumnAsDimension
)
{
this.minTimestamp = minTimestamp;
Expand All @@ -53,6 +55,7 @@ public IncrementalIndexSchema(
this.dimensionsSpec = dimensionsSpec;
this.metrics = metrics;
this.rollup = rollup;
this.includeTruncatedTimestampColumnAsDimension = includeTruncatedTimestampColumnAsDimension;
}

public long getMinTimestamp()
Expand Down Expand Up @@ -85,6 +88,11 @@ public boolean isRollup()
return rollup;
}

public boolean isIncludeTruncatedTimestampColumnAsDimension()
{
return includeTruncatedTimestampColumnAsDimension;
}

public static class Builder
{
private long minTimestamp;
Expand All @@ -93,6 +101,7 @@ public static class Builder
private DimensionsSpec dimensionsSpec;
private AggregatorFactory[] metrics;
private boolean rollup;
private boolean includeTruncatedTimestampColumnAsDimension;

public Builder()
{
Expand All @@ -101,6 +110,7 @@ public Builder()
this.dimensionsSpec = new DimensionsSpec(null, null, null);
this.metrics = new AggregatorFactory[]{};
this.rollup = true;
this.includeTruncatedTimestampColumnAsDimension = false;
}

public Builder withMinTimestamp(long minTimestamp)
Expand All @@ -109,6 +119,25 @@ public Builder withMinTimestamp(long minTimestamp)
return this;
}

public Builder withIncludeTruncatedTimestampColumnAsDimension(boolean includeTruncatedTimestampColumnAsDimension)
{
this.includeTruncatedTimestampColumnAsDimension = includeTruncatedTimestampColumnAsDimension;
return this;
}

public Builder withIncludeTruncatedTimestampColumnAsDimension(InputRowParser parser)
{
if (parser != null
&& parser.getParseSpec() != null
&& parser.getParseSpec().getDimensionsSpec() != null) {
this.includeTruncatedTimestampColumnAsDimension = parser.getParseSpec().getDimensionsSpec()
.isIncludeTruncatedTimestampColumnAsDimension();
} else {
this.includeTruncatedTimestampColumnAsDimension = false;
}
return this;
}

public Builder withTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
Expand Down Expand Up @@ -167,7 +196,7 @@ public Builder withRollup(boolean rollup)
public IncrementalIndexSchema build()
{
return new IncrementalIndexSchema(
minTimestamp, timestampSpec, gran, dimensionsSpec, metrics, rollup
minTimestamp, timestampSpec, gran, dimensionsSpec, metrics, rollup, includeTruncatedTimestampColumnAsDimension
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment.incremental;

import com.google.common.base.Function;
import com.metamx.common.IAE;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

public class TimestampFormatter
{
public static Function<Long, String> createTimestampFormatter(
final String format
)
{
if (format.equalsIgnoreCase("millis")) {
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return "" + input;
}
};
} else if (format.equalsIgnoreCase("posix")) {
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return "" + input / 1000;
}
};
} else if (format.equalsIgnoreCase("nano")) {
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return "" + input * 1000000L;
}
};
} else if (format.equalsIgnoreCase("ruby")) {
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return String.format("%d.%3d000", input / 1000, input % 1000);
}
};
} else if (format.equalsIgnoreCase("iso")) {
final DateTimeFormatter formatter = ISODateTimeFormat.dateTime();
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return formatter.print((input));
}
};
} else {
try {
final DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
return new Function<Long, String>()
{
@Override
public String apply(Long input)
{
return formatter.print(input);
}
};
}
catch (Exception e) {
throw new IAE(e, "Unable to format timestamps with format [%s]", format);
}
}
}
}
Loading