Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -66,18 +65,19 @@ public static Task getTask()
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null
),
DataSchema.builder()
.withDataSource("foo")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.withAggregators(new DoubleSumAggregatorFactory("met", "met"))
.withGranularity(
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
)
)
.build(),
new IndexTask.IndexIOConfig(
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand Down Expand Up @@ -211,14 +210,13 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
);

// generate DataSchema
DataSchema dataSchema = new DataSchema(
dataSourceName,
parser,
aggregators,
granularitySpec,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource(dataSourceName)
.withParserMap(parser)
.withAggregators(aggregators)
.withGranularity(granularitySpec)
.withObjectMapper(objectMapper)
.build();

// generate DatasourceIngestionSpec
DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -237,14 +236,10 @@ public void testCheckSegmentsAndSubmitTasks()
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;

DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_datasource")
.withObjectMapper(objectMapper)
.build();
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand All @@ -44,7 +43,6 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -102,16 +100,19 @@ private static DataSchema getDataSchema(String dataSource)
dimensions.add(StringDimensionSchema.create("dim1"));
dimensions.add(StringDimensionSchema.create("dim2"));

return new DataSchema(
dataSource,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[] {new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()),
null);
return DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(dimensions)
.withAggregators(new CountAggregatorFactory("rows"))
.withGranularity(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
)
)
.build();
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ public boolean equals(Object o)
&& stringEncoding == that.stringEncoding;
}

@Nullable
@Override
public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated)
{
if (this == preAggregated) {
return getCombiningFactory();
}
if (getClass() != preAggregated.getClass()) {
return null;
}
HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated;
if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals(
fieldName,
that.fieldName
)) {
return getCombiningFactory();
}
return null;
}

@Override
public int hashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ public void testEqualsOtherMatches()
Assert.assertArrayEquals(target.getCacheKey(), other.getCacheKey());
}

@Test
public void testCanSubstitute()
{
HllSketchBuildAggregatorFactory factory = new HllSketchBuildAggregatorFactory(
NAME,
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
true,
true
);
HllSketchBuildAggregatorFactory other = new HllSketchBuildAggregatorFactory(
"other name",
FIELD_NAME,
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);

HllSketchBuildAggregatorFactory incompatible = new HllSketchBuildAggregatorFactory(
NAME,
"different field",
LG_K,
TGT_HLL_TYPE,
STRING_ENCODING,
false,
false
);
Assert.assertNotNull(other.substituteCombiningFactory(factory));
Assert.assertNotNull(factory.substituteCombiningFactory(other));
Assert.assertNull(factory.substituteCombiningFactory(incompatible));
}

@Test
public void testToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -1262,28 +1261,27 @@ public void testKafkaRecordEntityInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -1337,26 +1335,25 @@ public void testKafkaInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -2887,16 +2884,7 @@ private KafkaIndexTask createTask(

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
dataSchema.getDataSource(),
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
dataSchema.getParserMap(),
OBJECT_MAPPER
);
return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build();
}

@Override
Expand Down
Loading