Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
import io.druid.segment.TestHelper;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.filter.DimFilter;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.TransformSpec;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.druid.query.filter.DimFilter;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
import io.druid.segment.column.Column;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.ExpressionTransform;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.ExpressionTransform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.TransformSpec;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.StorageLocationConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,44 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.common.guava.SettableSupplier;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import io.druid.segment.column.ValueType;
import io.druid.segment.transform.RowFunction;
import io.druid.segment.transform.Transform;
import io.druid.segment.transform.TransformSpec;
import io.druid.segment.transform.Transformer;
import org.joda.time.DateTime;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class DimFilterHavingSpec extends BaseHavingSpec
{
private static final boolean DEFAULT_FINALIZE = true;

private final DimFilter dimFilter;
private final SettableSupplier<Row> rowSupplier;
private final boolean finalize;

private ValueMatcher valueMatcher;
private Map<String, ValueType> rowSignature = new HashMap<>();
private Map<String, AggregatorFactory> aggregators = new HashMap<>();
private Transformer transformer = null;
private int evalCount;

@JsonCreator
public DimFilterHavingSpec(
@JsonProperty("filter") final DimFilter dimFilter
@JsonProperty("filter") final DimFilter dimFilter,
@JsonProperty("finalize") final Boolean finalize
)
{
this.dimFilter = Preconditions.checkNotNull(dimFilter, "filter");
this.rowSupplier = new SettableSupplier<>();
this.finalize = finalize == null ? DEFAULT_FINALIZE : finalize;
}

@JsonProperty("filter")
Expand All @@ -54,53 +68,184 @@ public DimFilter getDimFilter()
return dimFilter;
}

@JsonProperty
public boolean isFinalize()
{
return finalize;
}

@Override
public void setRowSignature(Map<String, ValueType> rowSignature)
{
this.valueMatcher = dimFilter.toFilter()
.makeMatcher(RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature));
this.rowSignature = rowSignature;
}

@Override
public void setAggregators(final Map<String, AggregatorFactory> aggregators)
{
this.aggregators = aggregators;
}

@Override
public boolean eval(final Row row)
{
int oldEvalCount = evalCount;
evalCount++;
rowSupplier.set(row);
final boolean retVal = valueMatcher.matches();

if (transformer == null) {
transformer = createTransformer(dimFilter, rowSignature, aggregators, finalize);
}

final boolean retVal = transformer.transform(new RowAsInputRow(row)) != null;

if (evalCount != oldEvalCount + 1) {
// Oops, someone was using this from two different threads, bad caller.
throw new IllegalStateException("concurrent 'eval' calls not permitted!");
}

return retVal;
}

@Override
public boolean equals(Object o)
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

DimFilterHavingSpec that = (DimFilterHavingSpec) o;

return dimFilter.equals(that.dimFilter);
final DimFilterHavingSpec that = (DimFilterHavingSpec) o;
return finalize == that.finalize &&
Objects.equals(dimFilter, that.dimFilter);
}

@Override
public int hashCode()
{
return dimFilter.hashCode();
return Objects.hash(dimFilter, finalize);
}

@Override
public String toString()
{
return "DimFilterHavingSpec{" +
"dimFilter=" + dimFilter +
", finalize=" + finalize +
'}';
}

private static Transformer createTransformer(
final DimFilter filter,
final Map<String, ValueType> rowSignature,
final Map<String, AggregatorFactory> aggregators,
final boolean finalize
)
{
final List<Transform> transforms = new ArrayList<>();

if (finalize) {
for (AggregatorFactory aggregator : aggregators.values()) {
final String name = aggregator.getName();

transforms.add(
new Transform()
{
@Override
public String getName()
{
return name;
}

@Override
public RowFunction getRowFunction()
{
return row -> aggregator.finalizeComputation(row.getRaw(name));
}
}
);
}
}

return new TransformSpec(filter, transforms).toTransformer(rowSignature);
}

private static class RowAsInputRow implements InputRow
{
private final Row row;

public RowAsInputRow(final Row row)
{
this.row = row;
}

@Override
public List<String> getDimensions()
{
return Collections.emptyList();
}

@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}

@Override
public DateTime getTimestamp()
{
return row.getTimestamp();
}

@Override
public List<String> getDimension(final String dimension)
{
return row.getDimension(dimension);
}

@Override
public Object getRaw(final String dimension)
{
return row.getRaw(dimension);
}

@Override
public Number getMetric(final String metric)
{
return row.getMetric(metric);
}

@Override
public int compareTo(final Row o)
{
return row.compareTo(o);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final RowAsInputRow that = (RowAsInputRow) o;
return Objects.equals(row, that.row);
}

@Override
public int hashCode()
{
return Objects.hash(row);
}

@Override
public String toString()
{
return "RowAsInputRow{" +
"row=" + row +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.segment.indexing;
package io.druid.segment.transform;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.segment.indexing;
package io.druid.segment.transform;

import io.druid.data.input.Row;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.segment.indexing;
package io.druid.segment.transform;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
Expand Down
Loading