Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -66,18 +65,19 @@ public static Task getTask()
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"foo",
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
),
null
),
DataSchema.builder()
.withDataSource("foo")
.withTimestamp(new TimestampSpec(null, null, null))
.withDimensions(DimensionsSpec.EMPTY)
.withAggregators(new DoubleSumAggregatorFactory("met", "met"))
.withGranularity(
new UniformGranularitySpec(
Granularities.DAY,
null,
ImmutableList.of(Intervals.of("2010-01-01/P2D"))
)
)
.build(),
new IndexTask.IndexIOConfig(
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
Expand Down Expand Up @@ -211,14 +210,13 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
);

// generate DataSchema
DataSchema dataSchema = new DataSchema(
dataSourceName,
parser,
aggregators,
granularitySpec,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource(dataSourceName)
.withParserMap(parser)
.withAggregators(aggregators)
.withGranularity(granularitySpec)
.withObjectMapper(objectMapper)
.build();

// generate DatasourceIngestionSpec
DatasourceIngestionSpec datasourceIngestionSpec = new DatasourceIngestionSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
Expand Down Expand Up @@ -237,14 +236,10 @@ public void testCheckSegmentsAndSubmitTasks()
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;

DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_datasource")
.withObjectMapper(objectMapper)
.build();
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand All @@ -44,7 +43,6 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
Expand Down Expand Up @@ -102,16 +100,19 @@ private static DataSchema getDataSchema(String dataSource)
dimensions.add(StringDimensionSchema.create("dim1"));
dimensions.add(StringDimensionSchema.create("dim2"));

return new DataSchema(
dataSource,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(dimensions),
new AggregatorFactory[] {new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()),
null);
return DataSchema.builder()
.withDataSource(dataSource)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(dimensions)
.withAggregators(new CountAggregatorFactory("rows"))
.withGranularity(
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
)
)
.build();
}

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
Expand Down Expand Up @@ -1262,28 +1261,27 @@ public void testKafkaRecordEntityInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.topic"),
new LongDimensionSchema("kafka.offset"),
new StringDimensionSchema("kafka.header.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -1337,26 +1335,25 @@ public void testKafkaInputFormat() throws Exception

final KafkaIndexTask task = createTask(
null,
new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
),
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat"),
new StringDimensionSchema("kafka.testheader.encoding")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
Expand Down Expand Up @@ -2888,16 +2885,7 @@ private KafkaIndexTask createTask(

private static DataSchema cloneDataSchema(final DataSchema dataSchema)
{
return new DataSchema(
dataSchema.getDataSource(),
dataSchema.getTimestampSpec(),
dataSchema.getDimensionsSpec(),
dataSchema.getAggregators(),
dataSchema.getGranularitySpec(),
dataSchema.getTransformSpec(),
dataSchema.getParserMap(),
OBJECT_MAPPER
);
return DataSchema.builder(dataSchema).withObjectMapper(OBJECT_MAPPER).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
Expand Down Expand Up @@ -81,45 +80,30 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest

private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final String TOPIC = "sampling";
private static final DataSchema DATA_SCHEMA = new DataSchema(
"test_ds",
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);

private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
"test_ds",
new TimestampSpec("kafka.timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);
private static final DataSchema DATA_SCHEMA =
DataSchema.builder()
.withDataSource("test_ds")
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)
)
.build();

private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP =
DataSchema.builder(DATA_SCHEMA)
.withTimestamp(new TimestampSpec("kafka.timestamp", "iso", null))
.build();

private static TestingCluster zkServer;
private static TestBroker kafkaServer;
Expand Down Expand Up @@ -364,17 +348,18 @@ public void testWithInputRowParser() throws IOException
);
InputRowParser parser = new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null), "UTF8");

DataSchema dataSchema = new DataSchema(
"test_ds",
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null,
objectMapper
);
DataSchema dataSchema = DataSchema.builder()
.withDataSource("test_ds")
.withParserMap(
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class)
)
.withAggregators(
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
)
.withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
.withObjectMapper(objectMapper)
.build();

KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
Expand Down
Loading