Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class MaterializedViewSupervisorTest
private TaskQueue taskQueue;
private MaterializedViewSupervisor supervisor;
private String derivativeDatasourceName;
private MaterializedViewSupervisorSpec spec;
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();

@Before
Expand All @@ -103,7 +104,7 @@ public void setUp()
taskQueue = EasyMock.createMock(TaskQueue.class);
taskQueue.start();
objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
spec = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
Expand Down Expand Up @@ -317,6 +318,35 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException

}

/**
* Verifies that creating HadoopIndexTask compleates without raising exception.
*/
@Test
Comment thread
liran-funaro marked this conversation as resolved.
public void testCreateTask()
{
List<DataSegment> baseSegments = Collections.singletonList(
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
9,
1024
)
);

HadoopIndexTask task = spec.createTask(
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
baseSegments
);

Assert.assertNotNull(task);
}

@Test
public void testSuspendedDoesntRun()
{
Expand Down
5 changes: 5 additions & 0 deletions extensions-core/kafka-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;

Expand All @@ -33,6 +34,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
{
@JsonCreator
public KafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
Expand All @@ -55,6 +57,7 @@ public KafkaIndexTaskTuningConfig(
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
Expand All @@ -81,6 +84,7 @@ public KafkaIndexTaskTuningConfig(
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand Down Expand Up @@ -67,11 +67,13 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}

public KafkaSupervisorTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
Expand Down Expand Up @@ -100,6 +102,7 @@ public KafkaSupervisorTuningConfig(
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
Expand Down Expand Up @@ -193,7 +196,7 @@ public String toString()
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
Comment thread
liran-funaro marked this conversation as resolved.
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
Expand All @@ -219,6 +222,7 @@ public String toString()
public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2497,6 +2497,7 @@ private KafkaIndexTask createTask(
) throws JsonProcessingException
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
null,
1000,
null,
maxRowsPerSegment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
Expand Down Expand Up @@ -60,6 +62,7 @@ public void testSerdeWithDefaults() throws Exception
);

Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertNull(config.getMaxTotalRows());
Expand All @@ -85,7 +88,8 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";

KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
Expand All @@ -99,6 +103,7 @@ public void testSerdeWithNonDefaults() throws Exception
);

Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, config.getMaxTotalRows());
Expand All @@ -115,6 +120,7 @@ public void testSerdeWithNonDefaults() throws Exception
public void testConvert()
{
KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
null,
Comment thread
liran-funaro marked this conversation as resolved.
1,
null,
2,
Expand Down Expand Up @@ -142,6 +148,7 @@ public void testConvert()
);
KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();

Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, copy.getMaxTotalRows());
Expand All @@ -158,6 +165,7 @@ public void testConvert()
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
null,
1,
null,
2,
Expand All @@ -183,6 +191,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);

Assert.assertEquals(null, deserialized.getExtra());
Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Expand All @@ -206,6 +215,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
null,
1,
null,
2,
Expand All @@ -231,6 +241,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
KafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);

Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
Expand All @@ -249,4 +260,12 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}

@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class)
.usingGetClass()
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException
null,
null,
null,
null,
null
),
null
Expand Down Expand Up @@ -3070,6 +3071,7 @@ public void testIsTaskCurrent()
kafkaHost,
dataSchema,
new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,
Expand Down Expand Up @@ -3109,6 +3111,7 @@ public void testIsTaskCurrent()
DataSchema modifiedDataSchema = getDataSchema("some other datasource");

KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
null,
42, // This is different
null,
50000,
Expand Down Expand Up @@ -3404,6 +3407,7 @@ public KafkaIndexTaskClient build(
};

final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,
Expand Down Expand Up @@ -3514,6 +3518,7 @@ public KafkaIndexTaskClient build(
};

final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
null,
1000,
null,
50000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void testSerdeWithDefaults() throws Exception
);

Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Expand Down Expand Up @@ -94,7 +96,8 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"offsetFetchPeriod\": \"PT20S\",\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";

KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
Expand All @@ -108,6 +111,7 @@ public void testSerdeWithNonDefaults() throws Exception
);

Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;

Expand All @@ -37,6 +38,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning

@JsonCreator
public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
Expand All @@ -60,6 +62,7 @@ public TestModifiedKafkaIndexTaskTuningConfig(
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
Expand Down
5 changes: 5 additions & 0 deletions extensions-core/kinesis-indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading