From 2d314e3cac8d7b436e0b042f857826a0e049cf33 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Nov 2019 19:03:52 -0700 Subject: [PATCH 01/12] accept spec or dataSchema, tuningConfig, ioConfig while submitting task json --- .../druid/indexing/common/task/IndexTask.java | 28 ++- .../task/CompactionTaskParallelRunTest.java | 3 + .../common/task/CompactionTaskRunTest.java | 6 + .../indexing/common/task/IndexTaskTest.java | 59 ++++++ .../indexing/common/task/TaskSerdeTest.java | 176 ++++++++++++++++++ .../indexing/overlord/TaskLifecycleTest.java | 12 ++ 6 files changed, 279 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 51b44f18d117..5716a58f7c94 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -123,9 +123,24 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); private static final String TYPE = "index"; - private static String makeGroupId(IndexIngestionSpec ingestionSchema) + private static String makeGroupId( + IndexIngestionSpec ingestionSchema, + DataSchema dataSchema, + IndexIOConfig ioConfig, + TuningConfig tuningConfig + ) { - return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); + final boolean isValid = (ingestionSchema != null) ^ (dataSchema != null + && ioConfig != null + && tuningConfig != null); + if (!isValid) { + throw new ISE("invalid spec input"); + } + if (ingestionSchema == null) { + return makeGroupId(ioConfig.appendToExisting, dataSchema.getDataSource()); + } else { + return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); + } } private static String makeGroupId(boolean isAppendToExisting, String dataSource) @@ -174,6 +189,9 @@ public IndexTask( @JsonProperty("id") final String id, @JsonProperty("resource") final TaskResource taskResource, @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") IndexIOConfig ioConfig, + @JsonProperty("tuningConfig") IndexTuningConfig tuningConfig, @JsonProperty("context") final Map context, @JacksonInject AuthorizerMapper authorizerMapper, @JacksonInject ChatHandlerProvider chatHandlerProvider, @@ -183,10 +201,10 @@ public IndexTask( { this( id, - makeGroupId(ingestionSchema), + makeGroupId(ingestionSchema, dataSchema, ioConfig, tuningConfig), taskResource, - ingestionSchema.dataSchema.getDataSource(), - ingestionSchema, + ingestionSchema == null ? dataSchema.getDataSource() : ingestionSchema.dataSchema.getDataSource(), + ingestionSchema == null ? new IndexIngestionSpec(dataSchema, ioConfig, tuningConfig) : ingestionSchema, context, authorizerMapper, chatHandlerProvider, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 330fed26701d..32c8d632ccc1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -197,6 +197,9 @@ private void runIndexTask() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 9a7fa51bad8e..3b0e450e565f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -361,6 +361,9 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, @@ -690,6 +693,9 @@ private Pair> runIndexTask( appendToExisting ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 8cb99452a80b..83878de52984 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -206,6 +206,9 @@ public void testDeterminePartitions() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -260,6 +263,9 @@ public void testTransformSpec() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -306,6 +312,9 @@ public void testWithArbitraryGranularity() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -345,6 +354,9 @@ public void testIntervalBucketing() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -380,6 +392,9 @@ public void testNumShardsProvided() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -420,6 +435,9 @@ public void testNumShardsAndPartitionDimensionsProvided() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -496,6 +514,9 @@ public void testAppendToExisting() throws Exception true ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -548,6 +569,9 @@ public void testIntervalNotSpecified() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -613,6 +637,9 @@ public void testCSVFileWithHeader() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -667,6 +694,9 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -716,6 +746,9 @@ public void testWithSmallMaxTotalRows() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -763,6 +796,9 @@ public void testPerfectRollup() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -809,6 +845,9 @@ public void testBestEffortRollup() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -885,6 +924,8 @@ public void testIgnoreParseException() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -940,6 +981,8 @@ public void testReportParseException() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1037,6 +1080,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1164,6 +1209,8 @@ public void testMultipleParseExceptionsFailure() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1282,6 +1329,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1383,6 +1432,8 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1455,6 +1506,8 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception ); IndexTask indexTask = new IndexTask( + null, + null, null, null, parseExceptionIgnoreSpec, @@ -1508,6 +1561,9 @@ public void testOverwriteWithSameSegmentGranularity() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1576,6 +1632,9 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception false ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index e49e61090887..0b6a19560cc8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -219,6 +220,9 @@ public void testIndexTaskSerde() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -259,6 +263,175 @@ public void testIndexTaskSerde() throws Exception Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); } + @Test + public void testIndexTaskNullSpecSerde() throws Exception + { + final IndexTask task = new IndexTask( + null, + null, + null, + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P2D")) + ), + null, + jsonMapper + ), + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig( + null, + null, + 10, + null, + null, + 9999, + null, + null, + new DynamicPartitionsSpec(10000, null), + indexSpec, + null, + 3, + false, + null, + null, + null, + null, + null, + null, + null + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + rowIngestionMetersFactory, + null + ); + + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + + IndexTask.IndexIOConfig taskIoConfig = task.getIngestionSchema().getIOConfig(); + IndexTask.IndexIOConfig task2IoConfig = task2.getIngestionSchema().getIOConfig(); + + Assert.assertTrue(taskIoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); + Assert.assertTrue(task2IoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); + Assert.assertEquals(taskIoConfig.isAppendToExisting(), task2IoConfig.isAppendToExisting()); + + IndexTask.IndexTuningConfig taskTuningConfig = task.getIngestionSchema().getTuningConfig(); + IndexTask.IndexTuningConfig task2TuningConfig = task2.getIngestionSchema().getTuningConfig(); + + Assert.assertEquals(taskTuningConfig.getBasePersistDirectory(), task2TuningConfig.getBasePersistDirectory()); + Assert.assertEquals(taskTuningConfig.getIndexSpec(), task2TuningConfig.getIndexSpec()); + Assert.assertEquals( + taskTuningConfig.getIntermediatePersistPeriod(), + task2TuningConfig.getIntermediatePersistPeriod() + ); + Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists()); + Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory()); + Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards()); + Assert.assertEquals(taskTuningConfig.getMaxRowsPerSegment(), task2TuningConfig.getMaxRowsPerSegment()); + Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); + } + + @Test(expected = ISE.class) + public void testIndexTaskInvalidSpecSerde() + { + new IndexTask( + null, + null, + new IndexIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P2D")) + ), + null, + jsonMapper + ), + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig( + null, + null, + 10, + null, + null, + 9999, + null, + null, + new DynamicPartitionsSpec(10000, null), + indexSpec, + null, + 3, + false, + null, + null, + null, + null, + null, + null, + null + ) + ), + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P2D")) + ), + null, + jsonMapper + ), + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig( + null, + null, + 10, + null, + null, + 9999, + null, + null, + new DynamicPartitionsSpec(10000, null), + indexSpec, + null, + 3, + false, + null, + null, + null, + null, + null, + null, + null + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + rowIngestionMetersFactory, + null + ); + } + @Test public void testIndexTaskwithResourceSerde() throws Exception { @@ -303,6 +476,9 @@ public void testIndexTaskwithResourceSerde() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 74c61d563eef..fb30ad6435f6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -702,6 +702,9 @@ public void testIndexTask() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -786,6 +789,9 @@ public void testIndexTaskFailure() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -1208,6 +1214,9 @@ public void testResumeTasks() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -1314,6 +1323,9 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception ) ), null, + null, + null, + null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, From bd4c1ce1eb6def7c229ea03b4fa7696baad31de9 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Nov 2019 22:31:41 -0700 Subject: [PATCH 02/12] fix test --- .../indexing/common/task/IndexTaskTest.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 83878de52984..597595942664 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -926,9 +926,10 @@ public void testIgnoreParseException() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -983,9 +984,10 @@ public void testReportParseException() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1082,9 +1084,10 @@ public void testMultipleParseExceptionsSuccess() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1211,9 +1214,10 @@ public void testMultipleParseExceptionsFailure() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1331,9 +1335,10 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1434,9 +1439,10 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, + null, null, null, - parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, @@ -1508,9 +1514,10 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception IndexTask indexTask = new IndexTask( null, null, + parseExceptionIgnoreSpec, null, null, - parseExceptionIgnoreSpec, + null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, From ffff56b44aacb01a17281a5895f9e1736bbc0568 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Nov 2019 23:13:45 -0700 Subject: [PATCH 03/12] update docs --- docs/tutorials/tutorial-batch.md | 4 +- .../quickstart/tutorial/wikipedia-index.json | 108 +++++++++--------- 2 files changed, 54 insertions(+), 58 deletions(-) diff --git a/docs/tutorials/tutorial-batch.md b/docs/tutorials/tutorial-batch.md index e85613667af6..db5ad14e1e05 100644 --- a/docs/tutorials/tutorial-batch.md +++ b/docs/tutorials/tutorial-batch.md @@ -134,8 +134,7 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12 ```json { - "type" : "index", - "spec" : { + "type" : "index", "dataSchema" : { "dataSource" : "wikipedia", "parser" : { @@ -194,7 +193,6 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12 "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000 } - } } ``` diff --git a/examples/quickstart/tutorial/wikipedia-index.json b/examples/quickstart/tutorial/wikipedia-index.json index 785fbda91679..84d90f8525c0 100644 --- a/examples/quickstart/tutorial/wikipedia-index.json +++ b/examples/quickstart/tutorial/wikipedia-index.json @@ -1,63 +1,61 @@ { "type" : "index", - "spec" : { - "dataSchema" : { - "dataSource" : "wikipedia", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "dimensionsSpec" : { - "dimensions" : [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] - }, - "timestampSpec": { - "column": "time", - "format": "iso" - } + "dataSchema" : { + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + }, + "timestampSpec": { + "column": "time", + "format": "iso" } - }, - "metricsSpec" : [], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "day", - "queryGranularity" : "none", - "intervals" : ["2015-09-12/2015-09-13"], - "rollup" : false } }, - "ioConfig" : { - "type" : "index", - "firehose" : { - "type" : "local", - "baseDir" : "quickstart/tutorial/", - "filter" : "wikiticker-2015-09-12-sampled.json.gz" - }, - "appendToExisting" : false - }, - "tuningConfig" : { - "type" : "index", - "maxRowsPerSegment" : 5000000, - "maxRowsInMemory" : 25000 + "metricsSpec" : [], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "day", + "queryGranularity" : "none", + "intervals" : ["2015-09-12/2015-09-13"], + "rollup" : false } + }, + "ioConfig" : { + "type" : "index", + "firehose" : { + "type" : "local", + "baseDir" : "quickstart/tutorial/", + "filter" : "wikiticker-2015-09-12-sampled.json.gz" + }, + "appendToExisting" : false + }, + "tuningConfig" : { + "type" : "index", + "maxRowsPerSegment" : 5000000, + "maxRowsInMemory" : 25000 } } From 5cab838c22cbfabda306badf734d13021d605dd9 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 5 Nov 2019 22:54:42 -0800 Subject: [PATCH 04/12] lgtm warning --- .../java/org/apache/druid/indexing/common/task/IndexTask.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5716a58f7c94..447321abf583 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -137,6 +137,8 @@ private static String makeGroupId( throw new ISE("invalid spec input"); } if (ingestionSchema == null) { + assert (ioConfig != null); + assert (dataSchema != null); return makeGroupId(ioConfig.appendToExisting, dataSchema.getDataSource()); } else { return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); From 473a969d31f73b8ee39ed3a57754146e66f779dc Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 13 Nov 2019 12:47:00 -0800 Subject: [PATCH 05/12] Add original constructor back to IndexTask to minimize changes --- .../druid/indexing/common/task/IndexTask.java | 41 ++++- .../task/CompactionTaskParallelRunTest.java | 3 - .../common/task/CompactionTaskRunTest.java | 6 - .../indexing/common/task/IndexTaskTest.java | 154 ++++++++++-------- .../indexing/common/task/TaskSerdeTest.java | 94 ----------- .../indexing/overlord/TaskLifecycleTest.java | 12 -- 6 files changed, 127 insertions(+), 183 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 447321abf583..c02e396cb37e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -123,6 +124,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler private static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); private static final String TYPE = "index"; + private static String makeGroupId(IndexIngestionSpec ingestionSchema) + { + return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); + } + private static String makeGroupId( IndexIngestionSpec ingestionSchema, DataSchema dataSchema, @@ -134,7 +140,11 @@ private static String makeGroupId( && ioConfig != null && tuningConfig != null); if (!isValid) { - throw new ISE("invalid spec input"); + if (ingestionSchema == null) { + throw new ISE("invalid spec input, please add dataSchema, ioConfig, tuningConfig to spec"); + } else { + throw new ISE("invalid spec input, please either add spec section or dataSchema, ioConfig, tuningConfig"); + } } if (ingestionSchema == null) { assert (ioConfig != null); @@ -186,11 +196,38 @@ private static String makeGroupId(boolean isAppendToExisting, String dataSource) @JsonIgnore private final AppenderatorsManager appenderatorsManager; - @JsonCreator + @Deprecated + @VisibleForTesting public IndexTask( @JsonProperty("id") final String id, @JsonProperty("resource") final TaskResource taskResource, @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, + @JsonProperty("context") final Map context, + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, + @JacksonInject AppenderatorsManager appenderatorsManager + ) + { + this( + id, + makeGroupId(ingestionSchema), + taskResource, + ingestionSchema.dataSchema.getDataSource(), + ingestionSchema, + context, + authorizerMapper, + chatHandlerProvider, + rowIngestionMetersFactory, + appenderatorsManager + ); + } + + @JsonCreator + public IndexTask( + @JsonProperty("id") final String id, + @JsonProperty("resource") final TaskResource taskResource, + @Deprecated @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("ioConfig") IndexIOConfig ioConfig, @JsonProperty("tuningConfig") IndexTuningConfig tuningConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java index 32c8d632ccc1..330fed26701d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java @@ -197,9 +197,6 @@ private void runIndexTask() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 3b0e450e565f..9a7fa51bad8e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -361,9 +361,6 @@ public void testRunIndexAndCompactAtTheSameTimeForDifferentInterval() throws Exc false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, @@ -693,9 +690,6 @@ private Pair> runIndexTask( appendToExisting ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, new NoopChatHandlerProvider(), rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 597595942664..371df47b212f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -55,6 +56,7 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.SelectorDimFilter; @@ -206,9 +208,6 @@ public void testDeterminePartitions() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -263,9 +262,6 @@ public void testTransformSpec() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -312,9 +308,6 @@ public void testWithArbitraryGranularity() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -354,9 +347,6 @@ public void testIntervalBucketing() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -392,9 +382,6 @@ public void testNumShardsProvided() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -435,9 +422,6 @@ public void testNumShardsAndPartitionDimensionsProvided() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -514,9 +498,6 @@ public void testAppendToExisting() throws Exception true ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -569,9 +550,6 @@ public void testIntervalNotSpecified() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -637,9 +615,6 @@ public void testCSVFileWithHeader() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -694,9 +669,6 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -746,9 +718,6 @@ public void testWithSmallMaxTotalRows() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -796,9 +765,6 @@ public void testPerfectRollup() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -845,9 +811,6 @@ public void testBestEffortRollup() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -928,9 +891,6 @@ public void testIgnoreParseException() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -986,9 +946,6 @@ public void testReportParseException() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1086,9 +1043,6 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1216,9 +1170,6 @@ public void testMultipleParseExceptionsFailure() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1337,9 +1288,6 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1441,9 +1389,6 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1516,9 +1461,6 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception null, parseExceptionIgnoreSpec, null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1568,9 +1510,6 @@ public void testOverwriteWithSameSegmentGranularity() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1639,9 +1578,6 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception false ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -1665,6 +1601,92 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception } } + @Test(expected = ISE.class) + public void testIndexTaskInvalidSpec() + { + new IndexTask( + null, + null, + new IndexIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P2D")) + ), + null, + jsonMapper + ), + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig( + null, + null, + 10, + null, + null, + 9999, + null, + null, + new DynamicPartitionsSpec(10000, null), + new IndexSpec(), + null, + 3, + false, + null, + null, + null, + null, + null, + null, + null + ) + ), + new DataSchema( + "foo", + null, + new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P2D")) + ), + null, + jsonMapper + ), + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig( + null, + null, + 10, + null, + null, + 9999, + null, + null, + new DynamicPartitionsSpec(10000, null), + new IndexSpec(), + null, + 3, + false, + null, + null, + null, + null, + null, + null, + null + ), + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + rowIngestionMetersFactory, + null + ); + } + public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status) { // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 0b6a19560cc8..d5ecc2d3bd2c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -34,7 +34,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -220,9 +219,6 @@ public void testIndexTaskSerde() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, @@ -314,7 +310,6 @@ public void testIndexTaskNullSpecSerde() throws Exception final String json = jsonMapper.writeValueAsString(task); - Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); @@ -346,92 +341,6 @@ public void testIndexTaskNullSpecSerde() throws Exception Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); } - @Test(expected = ISE.class) - public void testIndexTaskInvalidSpecSerde() - { - new IndexTask( - null, - null, - new IndexIngestionSpec( - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P2D")) - ), - null, - jsonMapper - ), - new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig( - null, - null, - 10, - null, - null, - 9999, - null, - null, - new DynamicPartitionsSpec(10000, null), - indexSpec, - null, - 3, - false, - null, - null, - null, - null, - null, - null, - null - ) - ), - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P2D")) - ), - null, - jsonMapper - ), - new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig( - null, - null, - 10, - null, - null, - 9999, - null, - null, - new DynamicPartitionsSpec(10000, null), - indexSpec, - null, - 3, - false, - null, - null, - null, - null, - null, - null, - null - ), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, - rowIngestionMetersFactory, - null - ); - } - @Test public void testIndexTaskwithResourceSerde() throws Exception { @@ -476,9 +385,6 @@ public void testIndexTaskwithResourceSerde() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, rowIngestionMetersFactory, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index fb30ad6435f6..74c61d563eef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -702,9 +702,6 @@ public void testIndexTask() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -789,9 +786,6 @@ public void testIndexTaskFailure() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -1214,9 +1208,6 @@ public void testResumeTasks() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, @@ -1323,9 +1314,6 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception ) ), null, - null, - null, - null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null, ROW_INGESTION_METERS_FACTORY, From 47fd2bad7724444e7e5166c86245ee4e8a1a6fad Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 13 Nov 2019 12:52:46 -0800 Subject: [PATCH 06/12] fix indentation in docs --- docs/tutorials/tutorial-batch.md | 110 +++++++++++++++---------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/docs/tutorials/tutorial-batch.md b/docs/tutorials/tutorial-batch.md index db5ad14e1e05..2e4db09032df 100644 --- a/docs/tutorials/tutorial-batch.md +++ b/docs/tutorials/tutorial-batch.md @@ -134,65 +134,65 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12 ```json { - "type" : "index", - "dataSchema" : { - "dataSource" : "wikipedia", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "dimensionsSpec" : { - "dimensions" : [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] - }, - "timestampSpec": { - "column": "time", - "format": "iso" - } + "type" : "index", + "dataSchema" : { + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + }, + "timestampSpec": { + "column": "time", + "format": "iso" } - }, - "metricsSpec" : [], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "day", - "queryGranularity" : "none", - "intervals" : ["2015-09-12/2015-09-13"], - "rollup" : false } }, - "ioConfig" : { - "type" : "index", - "firehose" : { - "type" : "local", - "baseDir" : "quickstart/tutorial/", - "filter" : "wikiticker-2015-09-12-sampled.json.gz" - }, - "appendToExisting" : false - }, - "tuningConfig" : { - "type" : "index", - "maxRowsPerSegment" : 5000000, - "maxRowsInMemory" : 25000 + "metricsSpec" : [], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "day", + "queryGranularity" : "none", + "intervals" : ["2015-09-12/2015-09-13"], + "rollup" : false } + }, + "ioConfig" : { + "type" : "index", + "firehose" : { + "type" : "local", + "baseDir" : "quickstart/tutorial/", + "filter" : "wikiticker-2015-09-12-sampled.json.gz" + }, + "appendToExisting" : false + }, + "tuningConfig" : { + "type" : "index", + "maxRowsPerSegment" : 5000000, + "maxRowsInMemory" : 25000 + } } ``` From 4c594528cb4bdfd2449867647ece19ba422775e8 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sun, 17 Nov 2019 22:54:21 -0800 Subject: [PATCH 07/12] Allow spec to be specified in supervisor schema --- .../KafkaSupervisorIngestionSpec.java | 66 ++++++++++++++ .../kafka/supervisor/KafkaSupervisorSpec.java | 64 +++++++------- .../KafkaSupervisorTuningConfig.java | 30 +++++++ .../indexing/kafka/KafkaSamplerSpecTest.java | 1 + .../supervisor/KafkaSupervisorSpecTest.java | 85 +++++++++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 3 + .../KinesisSupervisorIngestionSpec.java | 67 +++++++++++++++ .../supervisor/KinesisSupervisorSpec.java | 71 ++++++++-------- .../KinesisSupervisorTuningConfig.java | 37 ++++++++ .../kinesis/KinesisSamplerSpecTest.java | 1 + .../supervisor/KinesisSupervisorTest.java | 4 + ...SeekableStreamSupervisorIngestionSpec.java | 61 +++++++++++++ .../SeekableStreamSupervisorSpec.java | 10 ++- 13 files changed, 432 insertions(+), 68 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java new file mode 100644 index 000000000000..bbc00639f83a --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIngestionSpec.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.segment.indexing.DataSchema; + +public class KafkaSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final KafkaSupervisorIOConfig ioConfig; + private final KafkaSupervisorTuningConfig tuningConfig; + + @JsonCreator + public KafkaSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig + ) + { + super(dataSchema, ioConfig, tuningConfig); + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig == null ? KafkaSupervisorTuningConfig.defaultConfig() : tuningConfig; + } + + @Override + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @Override + @JsonProperty("ioConfig") + public KafkaSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @Override + @JsonProperty("tuningConfig") + public KafkaSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 772a26ad07c3..8c2af0737288 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -42,8 +42,23 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec { private static final String TASK_TYPE = "kafka"; + private static KafkaSupervisorTuningConfig getTuningConfig( + KafkaSupervisorIngestionSpec ingestionSchema, + KafkaSupervisorTuningConfig tuningConfig + ) + { + if (ingestionSchema != null) { + return ingestionSchema.getTuningConfig() != null + ? ingestionSchema.getTuningConfig() + : KafkaSupervisorTuningConfig.defaultConfig(); + } else { + return tuningConfig != null ? tuningConfig : KafkaSupervisorTuningConfig.defaultConfig(); + } + } + @JsonCreator public KafkaSupervisorSpec( + @JsonProperty("spec") KafkaSupervisorIngestionSpec ingestionSchema, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, @@ -61,36 +76,10 @@ public KafkaSupervisorSpec( ) { super( - dataSchema, - tuningConfig != null - ? tuningConfig - : new KafkaSupervisorTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - ioConfig, + ingestionSchema, + ingestionSchema != null ? ingestionSchema.getDataSchema() : dataSchema, + getTuningConfig(ingestionSchema, tuningConfig), + ingestionSchema != null ? ingestionSchema.getIOConfig() : ioConfig, context, suspended, taskStorage, @@ -131,6 +120,13 @@ public Supervisor createSupervisor() ); } + @Override + @JsonProperty + public DataSchema getDataSchema() + { + return super.getDataSchema(); + } + @Override @JsonProperty public KafkaSupervisorTuningConfig getTuningConfig() @@ -145,10 +141,18 @@ public KafkaSupervisorIOConfig getIoConfig() return (KafkaSupervisorIOConfig) super.getIoConfig(); } + @Override + @JsonProperty + public KafkaSupervisorIngestionSpec getSpec() + { + return (KafkaSupervisorIngestionSpec) super.getSpec(); + } + @Override protected KafkaSupervisorSpec toggleSuspend(boolean suspend) { return new KafkaSupervisorSpec( + getSpec(), getDataSchema(), getTuningConfig(), getIoConfig(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 5d5592afacf9..5c00988e8601 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -43,6 +43,36 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration offsetFetchPeriod; + public static KafkaSupervisorTuningConfig defaultConfig() + { + return new KafkaSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 5ede763b8d62..47d84d3f4ac0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -135,6 +135,7 @@ public void testSample() insertData(generateRecords(TOPIC)); KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, DATA_SCHEMA, null, new KafkaSupervisorIOConfig( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 1506d9894a27..8e12c8f650cb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -146,6 +146,91 @@ public void testSerde() throws IOException Assert.assertEquals(serialized, stable); } + @Test + public void testSerdeWithSpec() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"spec\": {\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topic\": \"metrics\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); + Assert.assertNotNull(spec.getIoConfig()); + Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + Assert.assertFalse(spec.isSuspended()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"tuningConfig\":{")); + Assert.assertTrue(serialized.contains("\"indexSpec\":{")); + Assert.assertTrue(serialized.contains("\"suspended\":false")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } + @Test public void testSuspendResume() throws IOException { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 89ec2b6541c8..1bb6f50f5319 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -3288,6 +3288,7 @@ public KafkaIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, @@ -3396,6 +3397,7 @@ public KafkaIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, @@ -3481,6 +3483,7 @@ public KafkaIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KafkaSupervisorSpec( + null, dataSchema, tuningConfig, kafkaSupervisorIOConfig, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java new file mode 100644 index 000000000000..706589108155 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIngestionSpec.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.segment.indexing.DataSchema; + +public class KinesisSupervisorIngestionSpec extends SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final KinesisSupervisorIOConfig ioConfig; + private final KinesisSupervisorTuningConfig tuningConfig; + + @JsonCreator + public KinesisSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig + ) + { + super(dataSchema, ioConfig, tuningConfig); + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig == null ? KinesisSupervisorTuningConfig.defaultConfig() : tuningConfig; + } + + @Override + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @Override + @JsonProperty("ioConfig") + public KinesisSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @Override + @JsonProperty("tuningConfig") + public KinesisSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} + diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index fb5751b84c43..c50413054265 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -45,8 +45,23 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec private static final String SUPERVISOR_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; + private static KinesisSupervisorTuningConfig getTuningConfig( + KinesisSupervisorIngestionSpec ingestionSchema, + KinesisSupervisorTuningConfig tuningConfig + ) + { + if (ingestionSchema != null) { + return ingestionSchema.getTuningConfig() != null + ? ingestionSchema.getTuningConfig() + : KinesisSupervisorTuningConfig.defaultConfig(); + } else { + return tuningConfig != null ? tuningConfig : KinesisSupervisorTuningConfig.defaultConfig(); + } + } + @JsonCreator public KinesisSupervisorSpec( + @JsonProperty("spec") final KinesisSupervisorIngestionSpec ingestionSchema, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, @@ -65,43 +80,10 @@ public KinesisSupervisorSpec( ) { super( - dataSchema, - tuningConfig != null - ? tuningConfig - : new KinesisSupervisorTuningConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ), - ioConfig, + ingestionSchema, + ingestionSchema != null ? ingestionSchema.getDataSchema() : dataSchema, + getTuningConfig(ingestionSchema, tuningConfig), + ingestionSchema != null ? ingestionSchema.getIOConfig() : ioConfig, context, suspended, taskStorage, @@ -157,6 +139,13 @@ public String toString() '}'; } + @Override + @JsonProperty + public DataSchema getDataSchema() + { + return super.getDataSchema(); + } + @Override @JsonProperty public KinesisSupervisorTuningConfig getTuningConfig() @@ -171,10 +160,18 @@ public KinesisSupervisorIOConfig getIoConfig() return (KinesisSupervisorIOConfig) super.getIoConfig(); } + @Override + @JsonProperty + public KinesisSupervisorIngestionSpec getSpec() + { + return (KinesisSupervisorIngestionSpec) super.getSpec(); + } + @Override protected KinesisSupervisorSpec toggleSuspend(boolean suspend) { return new KinesisSupervisorSpec( + getSpec(), getDataSchema(), getTuningConfig(), getIoConfig(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 91da145b1add..bc3bbd2314a1 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -40,6 +40,43 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig private final Duration shutdownTimeout; private final Duration repartitionTransitionDuration; + public static KinesisSupervisorTuningConfig defaultConfig() + { + return new KinesisSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + } + public KinesisSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 3cfcd97208ea..2d0e8f1a01ad 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -145,6 +145,7 @@ public void testSample() throws Exception replayAll(); KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec( + null, DATA_SCHEMA, null, new KinesisSupervisorIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index b9a1ed6e01aa..c3eba4b368ae 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -4677,6 +4677,7 @@ public KinesisIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4780,6 +4781,7 @@ public KinesisIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4865,6 +4867,7 @@ public KinesisIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, @@ -4952,6 +4955,7 @@ public KinesisIndexTaskClient build( taskClientFactory, OBJECT_MAPPER, new KinesisSupervisorSpec( + null, dataSchema, tuningConfig, kinesisSupervisorIOConfig, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java new file mode 100644 index 000000000000..6fbf917b774c --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIngestionSpec.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.indexing.DataSchema; + +public abstract class SeekableStreamSupervisorIngestionSpec +{ + private final DataSchema dataSchema; + private final SeekableStreamSupervisorIOConfig ioConfig; + private final SeekableStreamSupervisorTuningConfig tuningConfig; + + @JsonCreator + public SeekableStreamSupervisorIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, + @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig + ) + { + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig; + } + + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty("ioConfig") + public SeekableStreamSupervisorIOConfig getIOConfig() + { + return ioConfig; + } + + @JsonProperty("tuningConfig") + public SeekableStreamSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 469821ac8e68..5c310b7b41a3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -50,6 +50,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec protected final SeekableStreamIndexTaskClientFactory indexTaskClientFactory; protected final ObjectMapper mapper; protected final RowIngestionMetersFactory rowIngestionMetersFactory; + private final SeekableStreamSupervisorIngestionSpec ingestionSchema; private final DataSchema dataSchema; private final SeekableStreamSupervisorTuningConfig tuningConfig; private final SeekableStreamSupervisorIOConfig ioConfig; @@ -62,6 +63,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @JsonCreator public SeekableStreamSupervisorSpec( + @JsonProperty("spec") final SeekableStreamSupervisorIngestionSpec ingestionSchema, @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, @@ -78,6 +80,7 @@ public SeekableStreamSupervisorSpec( @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig ) { + this.ingestionSchema = ingestionSchema; this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); this.tuningConfig = tuningConfig; // null check done in concrete class this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); @@ -95,6 +98,12 @@ public SeekableStreamSupervisorSpec( this.supervisorStateManagerConfig = supervisorStateManagerConfig; } + @JsonProperty + public SeekableStreamSupervisorIngestionSpec getSpec() + { + return ingestionSchema; + } + @JsonProperty public DataSchema getDataSchema() { @@ -171,5 +180,4 @@ public boolean isSuspended() protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); - } From 65d0bf5e1ab760545fa8b4122e19cdc771cc7dd5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 18 Nov 2019 10:31:50 -0800 Subject: [PATCH 08/12] undo IndexTask spec changes --- .../druid/indexing/common/task/IndexTask.java | 59 +------------ .../indexing/common/task/IndexTaskTest.java | 88 ------------------- .../indexing/common/task/TaskSerdeTest.java | 82 ----------------- 3 files changed, 1 insertion(+), 228 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 7844204ff197..b3fecb19c979 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -27,7 +27,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -146,32 +145,6 @@ private static String makeGroupId(IndexIngestionSpec ingestionSchema) return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); } - private static String makeGroupId( - IndexIngestionSpec ingestionSchema, - DataSchema dataSchema, - IndexIOConfig ioConfig, - TuningConfig tuningConfig - ) - { - final boolean isValid = (ingestionSchema != null) ^ (dataSchema != null - && ioConfig != null - && tuningConfig != null); - if (!isValid) { - if (ingestionSchema == null) { - throw new ISE("invalid spec input, please add dataSchema, ioConfig, tuningConfig to spec"); - } else { - throw new ISE("invalid spec input, please either add spec section or dataSchema, ioConfig, tuningConfig"); - } - } - if (ingestionSchema == null) { - assert (ioConfig != null); - assert (dataSchema != null); - return makeGroupId(ioConfig.appendToExisting, dataSchema.getDataSource()); - } else { - return makeGroupId(ingestionSchema.ioConfig.appendToExisting, ingestionSchema.dataSchema.getDataSource()); - } - } - private static String makeGroupId(boolean isAppendToExisting, String dataSource) { if (isAppendToExisting) { @@ -213,8 +186,7 @@ private static String makeGroupId(boolean isAppendToExisting, String dataSource) @JsonIgnore private final AppenderatorsManager appenderatorsManager; - @Deprecated - @VisibleForTesting + @JsonCreator public IndexTask( @JsonProperty("id") final String id, @JsonProperty("resource") final TaskResource taskResource, @@ -240,35 +212,6 @@ public IndexTask( ); } - @JsonCreator - public IndexTask( - @JsonProperty("id") final String id, - @JsonProperty("resource") final TaskResource taskResource, - @Deprecated @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("ioConfig") IndexIOConfig ioConfig, - @JsonProperty("tuningConfig") IndexTuningConfig tuningConfig, - @JsonProperty("context") final Map context, - @JacksonInject AuthorizerMapper authorizerMapper, - @JacksonInject ChatHandlerProvider chatHandlerProvider, - @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, - @JacksonInject AppenderatorsManager appenderatorsManager - ) - { - this( - id, - makeGroupId(ingestionSchema, dataSchema, ioConfig, tuningConfig), - taskResource, - ingestionSchema == null ? dataSchema.getDataSource() : ingestionSchema.dataSchema.getDataSource(), - ingestionSchema == null ? new IndexIngestionSpec(dataSchema, ioConfig, tuningConfig) : ingestionSchema, - context, - authorizerMapper, - chatHandlerProvider, - rowIngestionMetersFactory, - appenderatorsManager - ); - } - public IndexTask( String id, String groupId, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index dfb5b6632053..f0594724d325 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -50,7 +50,6 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -59,7 +58,6 @@ import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.SelectorDimFilter; @@ -1635,92 +1633,6 @@ public void testOverwriteWithDifferentSegmentGranularity() throws Exception } } - @Test(expected = ISE.class) - public void testIndexTaskInvalidSpec() - { - new IndexTask( - null, - null, - new IndexIngestionSpec( - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P2D")) - ), - null, - jsonMapper - ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig( - null, - null, - 10, - null, - null, - 9999, - null, - null, - new DynamicPartitionsSpec(10000, null), - new IndexSpec(), - null, - 3, - false, - null, - null, - null, - null, - null, - null, - null - ) - ), - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P2D")) - ), - null, - jsonMapper - ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig( - null, - null, - 10, - null, - null, - 9999, - null, - null, - new DynamicPartitionsSpec(10000, null), - new IndexSpec(), - null, - 3, - false, - null, - null, - null, - null, - null, - null, - null - ), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, - rowIngestionMetersFactory, - null - ); - } - public static void checkTaskStatusErrorMsgForParseExceptionsExceeded(TaskStatus status) { // full stacktrace will be too long and make tests brittle (e.g. if line # changes), just match the main message diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index e9eb27eab501..2ba37ff0bd09 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -263,88 +263,6 @@ public void testIndexTaskSerde() throws Exception Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); } - @Test - public void testIndexTaskNullSpecSerde() throws Exception - { - final IndexTask task = new IndexTask( - null, - null, - null, - new DataSchema( - "foo", - null, - new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P2D")) - ), - null, - jsonMapper - ), - new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig( - null, - null, - 10, - null, - null, - 9999, - null, - null, - new DynamicPartitionsSpec(10000, null), - indexSpec, - null, - 3, - false, - null, - null, - null, - null, - null, - null, - null - ), - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null, - rowIngestionMetersFactory, - null - ); - - final String json = jsonMapper.writeValueAsString(task); - - final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); - - Assert.assertEquals("foo", task.getDataSource()); - - Assert.assertEquals(task.getId(), task2.getId()); - Assert.assertEquals(task.getGroupId(), task2.getGroupId()); - Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - - IndexTask.IndexIOConfig taskIoConfig = task.getIngestionSchema().getIOConfig(); - IndexTask.IndexIOConfig task2IoConfig = task2.getIngestionSchema().getIOConfig(); - - Assert.assertTrue(taskIoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); - Assert.assertTrue(task2IoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); - Assert.assertEquals(taskIoConfig.isAppendToExisting(), task2IoConfig.isAppendToExisting()); - - IndexTask.IndexTuningConfig taskTuningConfig = task.getIngestionSchema().getTuningConfig(); - IndexTask.IndexTuningConfig task2TuningConfig = task2.getIngestionSchema().getTuningConfig(); - - Assert.assertEquals(taskTuningConfig.getBasePersistDirectory(), task2TuningConfig.getBasePersistDirectory()); - Assert.assertEquals(taskTuningConfig.getIndexSpec(), task2TuningConfig.getIndexSpec()); - Assert.assertEquals( - taskTuningConfig.getIntermediatePersistPeriod(), - task2TuningConfig.getIntermediatePersistPeriod() - ); - Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists()); - Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory()); - Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards()); - Assert.assertEquals(taskTuningConfig.getMaxRowsPerSegment(), task2TuningConfig.getMaxRowsPerSegment()); - Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); - } - @Test public void testIndexTaskwithResourceSerde() throws Exception { From dc4cbed484306963bfb71fcc13ca8fb8133215f4 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 18 Nov 2019 11:11:44 -0800 Subject: [PATCH 09/12] update docs --- docs/tutorials/tutorial-batch.md | 108 +++++++++-------- docs/tutorials/tutorial-kafka.md | 104 ++++++++-------- .../quickstart/tutorial/wikipedia-index.json | 108 +++++++++-------- .../tutorial/wikipedia-kafka-supervisor.json | 113 ++++++++++-------- 4 files changed, 225 insertions(+), 208 deletions(-) diff --git a/docs/tutorials/tutorial-batch.md b/docs/tutorials/tutorial-batch.md index 2e4db09032df..e85613667af6 100644 --- a/docs/tutorials/tutorial-batch.md +++ b/docs/tutorials/tutorial-batch.md @@ -135,63 +135,65 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12 ```json { "type" : "index", - "dataSchema" : { - "dataSource" : "wikipedia", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "dimensionsSpec" : { - "dimensions" : [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] - }, - "timestampSpec": { - "column": "time", - "format": "iso" + "spec" : { + "dataSchema" : { + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + }, + "timestampSpec": { + "column": "time", + "format": "iso" + } } + }, + "metricsSpec" : [], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "day", + "queryGranularity" : "none", + "intervals" : ["2015-09-12/2015-09-13"], + "rollup" : false } }, - "metricsSpec" : [], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "day", - "queryGranularity" : "none", - "intervals" : ["2015-09-12/2015-09-13"], - "rollup" : false - } - }, - "ioConfig" : { - "type" : "index", - "firehose" : { - "type" : "local", - "baseDir" : "quickstart/tutorial/", - "filter" : "wikiticker-2015-09-12-sampled.json.gz" + "ioConfig" : { + "type" : "index", + "firehose" : { + "type" : "local", + "baseDir" : "quickstart/tutorial/", + "filter" : "wikiticker-2015-09-12-sampled.json.gz" + }, + "appendToExisting" : false }, - "appendToExisting" : false - }, - "tuningConfig" : { - "type" : "index", - "maxRowsPerSegment" : 5000000, - "maxRowsInMemory" : 25000 + "tuningConfig" : { + "type" : "index", + "maxRowsPerSegment" : 5000000, + "maxRowsInMemory" : 25000 + } } } ``` diff --git a/docs/tutorials/tutorial-kafka.md b/docs/tutorials/tutorial-kafka.md index e7502ba444c0..6a7b8e30eb8e 100644 --- a/docs/tutorials/tutorial-kafka.md +++ b/docs/tutorials/tutorial-kafka.md @@ -180,60 +180,62 @@ Paste in this spec and click `Submit`. ```json { "type": "kafka", - "dataSchema": { - "dataSource": "wikipedia", - "parser": { - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "time", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] + "spec" : { + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + } } + }, + "metricsSpec" : [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "rollup": false } }, - "metricsSpec" : [], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE", - "rollup": false - } - }, - "tuningConfig": { - "type": "kafka", - "reportParseExceptions": false - }, - "ioConfig": { - "topic": "wikipedia", - "replicas": 2, - "taskDuration": "PT10M", - "completionTimeout": "PT20M", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": false + }, + "ioConfig": { + "topic": "wikipedia", + "replicas": 2, + "taskDuration": "PT10M", + "completionTimeout": "PT20M", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + } } } } diff --git a/examples/quickstart/tutorial/wikipedia-index.json b/examples/quickstart/tutorial/wikipedia-index.json index 84d90f8525c0..785fbda91679 100644 --- a/examples/quickstart/tutorial/wikipedia-index.json +++ b/examples/quickstart/tutorial/wikipedia-index.json @@ -1,61 +1,63 @@ { "type" : "index", - "dataSchema" : { - "dataSource" : "wikipedia", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "dimensionsSpec" : { - "dimensions" : [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] - }, - "timestampSpec": { - "column": "time", - "format": "iso" + "spec" : { + "dataSchema" : { + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { "name": "added", "type": "long" }, + { "name": "deleted", "type": "long" }, + { "name": "delta", "type": "long" } + ] + }, + "timestampSpec": { + "column": "time", + "format": "iso" + } } + }, + "metricsSpec" : [], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "day", + "queryGranularity" : "none", + "intervals" : ["2015-09-12/2015-09-13"], + "rollup" : false } }, - "metricsSpec" : [], - "granularitySpec" : { - "type" : "uniform", - "segmentGranularity" : "day", - "queryGranularity" : "none", - "intervals" : ["2015-09-12/2015-09-13"], - "rollup" : false - } - }, - "ioConfig" : { - "type" : "index", - "firehose" : { - "type" : "local", - "baseDir" : "quickstart/tutorial/", - "filter" : "wikiticker-2015-09-12-sampled.json.gz" + "ioConfig" : { + "type" : "index", + "firehose" : { + "type" : "local", + "baseDir" : "quickstart/tutorial/", + "filter" : "wikiticker-2015-09-12-sampled.json.gz" + }, + "appendToExisting" : false }, - "appendToExisting" : false - }, - "tuningConfig" : { - "type" : "index", - "maxRowsPerSegment" : 5000000, - "maxRowsInMemory" : 25000 + "tuningConfig" : { + "type" : "index", + "maxRowsPerSegment" : 5000000, + "maxRowsInMemory" : 25000 + } } } diff --git a/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json b/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json index 2a56955c1e34..7c1e62c34f92 100644 --- a/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json +++ b/examples/quickstart/tutorial/wikipedia-kafka-supervisor.json @@ -1,59 +1,70 @@ { "type": "kafka", - "dataSchema": { - "dataSource": "wikipedia", - "parser": { - "type": "string", - "parseSpec": { - "format": "json", - "timestampSpec": { - "column": "time", - "format": "auto" - }, - "dimensionsSpec": { - "dimensions": [ - "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", - "page", - "regionIsoCode", - "regionName", - "user", - { "name": "added", "type": "long" }, - { "name": "deleted", "type": "long" }, - { "name": "delta", "type": "long" } - ] + "spec" : { + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "channel", + "cityName", + "comment", + "countryIsoCode", + "countryName", + "isAnonymous", + "isMinor", + "isNew", + "isRobot", + "isUnpatrolled", + "metroCode", + "namespace", + "page", + "regionIsoCode", + "regionName", + "user", + { + "name": "added", + "type": "long" + }, + { + "name": "deleted", + "type": "long" + }, + { + "name": "delta", + "type": "long" + } + ] + } } + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "rollup": false } }, - "metricsSpec" : [], - "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "NONE", - "rollup": false - } - }, - "tuningConfig": { - "type": "kafka", - "reportParseExceptions": false - }, - "ioConfig": { - "topic": "wikipedia", - "replicas": 1, - "taskDuration": "PT10M", - "completionTimeout": "PT20M", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": false + }, + "ioConfig": { + "topic": "wikipedia", + "replicas": 1, + "taskDuration": "PT10M", + "completionTimeout": "PT20M", + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + } } } } From 9384bcef427903c80994302a6438acff40359eba Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 19 Nov 2019 15:42:14 -0800 Subject: [PATCH 10/12] Add Nullable and deprecated annotations --- .../kafka/supervisor/KafkaSupervisorSpec.java | 23 ++++++++----------- .../supervisor/KinesisSupervisorSpec.java | 19 +++++++-------- .../SeekableStreamSupervisorSpec.java | 1 + 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 8c2af0737288..cb1b48e2e0b6 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -36,6 +36,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import javax.annotation.Nullable; import java.util.Map; public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec @@ -43,8 +44,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec private static final String TASK_TYPE = "kafka"; private static KafkaSupervisorTuningConfig getTuningConfig( - KafkaSupervisorIngestionSpec ingestionSchema, - KafkaSupervisorTuningConfig tuningConfig + @Nullable KafkaSupervisorIngestionSpec ingestionSchema, + @Nullable KafkaSupervisorTuningConfig tuningConfig ) { if (ingestionSchema != null) { @@ -58,10 +59,10 @@ private static KafkaSupervisorTuningConfig getTuningConfig( @JsonCreator public KafkaSupervisorSpec( - @JsonProperty("spec") KafkaSupervisorIngestionSpec ingestionSchema, - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("spec") @Nullable KafkaSupervisorIngestionSpec ingestionSchema, + @JsonProperty("dataSchema") @Nullable DataSchema dataSchema, + @JsonProperty("tuningConfig") @Nullable KafkaSupervisorTuningConfig tuningConfig, + @JsonProperty("ioConfig") @Nullable KafkaSupervisorIOConfig ioConfig, @JsonProperty("context") Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -121,13 +122,7 @@ public Supervisor createSupervisor() } @Override - @JsonProperty - public DataSchema getDataSchema() - { - return super.getDataSchema(); - } - - @Override + @Deprecated @JsonProperty public KafkaSupervisorTuningConfig getTuningConfig() { @@ -135,6 +130,7 @@ public KafkaSupervisorTuningConfig getTuningConfig() } @Override + @Deprecated @JsonProperty public KafkaSupervisorIOConfig getIoConfig() { @@ -142,6 +138,7 @@ public KafkaSupervisorIOConfig getIoConfig() } @Override + @Nullable @JsonProperty public KafkaSupervisorIngestionSpec getSpec() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index c50413054265..b953388892ac 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -38,6 +38,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import javax.annotation.Nullable; import java.util.Map; public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec @@ -61,10 +62,10 @@ private static KinesisSupervisorTuningConfig getTuningConfig( @JsonCreator public KinesisSupervisorSpec( - @JsonProperty("spec") final KinesisSupervisorIngestionSpec ingestionSchema, - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") KinesisSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") KinesisSupervisorIOConfig ioConfig, + @JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema, + @JsonProperty("dataSchema") @Nullable DataSchema dataSchema, + @JsonProperty("tuningConfig") @Nullable KinesisSupervisorTuningConfig tuningConfig, + @JsonProperty("ioConfig") @Nullable KinesisSupervisorIOConfig ioConfig, @JsonProperty("context") Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -140,13 +141,7 @@ public String toString() } @Override - @JsonProperty - public DataSchema getDataSchema() - { - return super.getDataSchema(); - } - - @Override + @Deprecated @JsonProperty public KinesisSupervisorTuningConfig getTuningConfig() { @@ -154,6 +149,7 @@ public KinesisSupervisorTuningConfig getTuningConfig() } @Override + @Deprecated @JsonProperty public KinesisSupervisorIOConfig getIoConfig() { @@ -161,6 +157,7 @@ public KinesisSupervisorIOConfig getIoConfig() } @Override + @Nullable @JsonProperty public KinesisSupervisorIngestionSpec getSpec() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 5c310b7b41a3..0b2d646d1537 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -104,6 +104,7 @@ public SeekableStreamSupervisorIngestionSpec getSpec() return ingestionSchema; } + @Deprecated @JsonProperty public DataSchema getDataSchema() { From e70e42f3b69517b18e33ce80b06e3fa5815b258c Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 19 Nov 2019 17:59:48 -0800 Subject: [PATCH 11/12] remove deprecated configs from SeekableStreamSupervisorSpec --- .../kafka/supervisor/KafkaSupervisorSpec.java | 27 ++++++----------- .../supervisor/KinesisSupervisorSpec.java | 27 ++++++----------- .../SeekableStreamSupervisorSpec.java | 30 ++++++++++--------- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index cb1b48e2e0b6..a47f2c204006 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -43,20 +43,6 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec { private static final String TASK_TYPE = "kafka"; - private static KafkaSupervisorTuningConfig getTuningConfig( - @Nullable KafkaSupervisorIngestionSpec ingestionSchema, - @Nullable KafkaSupervisorTuningConfig tuningConfig - ) - { - if (ingestionSchema != null) { - return ingestionSchema.getTuningConfig() != null - ? ingestionSchema.getTuningConfig() - : KafkaSupervisorTuningConfig.defaultConfig(); - } else { - return tuningConfig != null ? tuningConfig : KafkaSupervisorTuningConfig.defaultConfig(); - } - } - @JsonCreator public KafkaSupervisorSpec( @JsonProperty("spec") @Nullable KafkaSupervisorIngestionSpec ingestionSchema, @@ -77,10 +63,15 @@ public KafkaSupervisorSpec( ) { super( - ingestionSchema, - ingestionSchema != null ? ingestionSchema.getDataSchema() : dataSchema, - getTuningConfig(ingestionSchema, tuningConfig), - ingestionSchema != null ? ingestionSchema.getIOConfig() : ioConfig, + ingestionSchema != null + ? ingestionSchema + : new KafkaSupervisorIngestionSpec( + dataSchema, + ioConfig, + tuningConfig != null + ? tuningConfig + : KafkaSupervisorTuningConfig.defaultConfig() + ), context, suspended, taskStorage, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index b953388892ac..b07d70f78362 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -46,20 +46,6 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec private static final String SUPERVISOR_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; - private static KinesisSupervisorTuningConfig getTuningConfig( - KinesisSupervisorIngestionSpec ingestionSchema, - KinesisSupervisorTuningConfig tuningConfig - ) - { - if (ingestionSchema != null) { - return ingestionSchema.getTuningConfig() != null - ? ingestionSchema.getTuningConfig() - : KinesisSupervisorTuningConfig.defaultConfig(); - } else { - return tuningConfig != null ? tuningConfig : KinesisSupervisorTuningConfig.defaultConfig(); - } - } - @JsonCreator public KinesisSupervisorSpec( @JsonProperty("spec") @Nullable KinesisSupervisorIngestionSpec ingestionSchema, @@ -81,10 +67,15 @@ public KinesisSupervisorSpec( ) { super( - ingestionSchema, - ingestionSchema != null ? ingestionSchema.getDataSchema() : dataSchema, - getTuningConfig(ingestionSchema, tuningConfig), - ingestionSchema != null ? ingestionSchema.getIOConfig() : ioConfig, + ingestionSchema != null + ? ingestionSchema + : new KinesisSupervisorIngestionSpec( + dataSchema, + ioConfig, + tuningConfig != null + ? tuningConfig + : KinesisSupervisorTuningConfig.defaultConfig() + ), context, suspended, taskStorage, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 0b2d646d1537..ad26c209b4f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -44,6 +44,17 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { + + private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema( + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + Preconditions.checkNotNull(ingestionSchema, "ingestionSchema"); + Preconditions.checkNotNull(ingestionSchema.getDataSchema(), "dataSchema"); + Preconditions.checkNotNull(ingestionSchema.getIOConfig(), "ioConfig"); + return ingestionSchema; + } + protected final TaskStorage taskStorage; protected final TaskMaster taskMaster; protected final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -51,9 +62,6 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec protected final ObjectMapper mapper; protected final RowIngestionMetersFactory rowIngestionMetersFactory; private final SeekableStreamSupervisorIngestionSpec ingestionSchema; - private final DataSchema dataSchema; - private final SeekableStreamSupervisorTuningConfig tuningConfig; - private final SeekableStreamSupervisorIOConfig ioConfig; @Nullable private final Map context; protected final ServiceEmitter emitter; @@ -64,9 +72,6 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @JsonCreator public SeekableStreamSupervisorSpec( @JsonProperty("spec") final SeekableStreamSupervisorIngestionSpec ingestionSchema, - @JsonProperty("dataSchema") DataSchema dataSchema, - @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig, - @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, @JsonProperty("context") @Nullable Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @@ -80,10 +85,7 @@ public SeekableStreamSupervisorSpec( @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig ) { - this.ingestionSchema = ingestionSchema; - this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); - this.tuningConfig = tuningConfig; // null check done in concrete class - this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + this.ingestionSchema = checkIngestionSchema(ingestionSchema); this.context = context; this.taskStorage = taskStorage; @@ -108,19 +110,19 @@ public SeekableStreamSupervisorIngestionSpec getSpec() @JsonProperty public DataSchema getDataSchema() { - return dataSchema; + return ingestionSchema.getDataSchema(); } @JsonProperty public SeekableStreamSupervisorTuningConfig getTuningConfig() { - return tuningConfig; + return ingestionSchema.getTuningConfig(); } @JsonProperty public SeekableStreamSupervisorIOConfig getIoConfig() { - return ioConfig; + return ingestionSchema.getIOConfig(); } @Nullable @@ -138,7 +140,7 @@ public ServiceEmitter getEmitter() @Override public String getId() { - return dataSchema.getDataSource(); + return ingestionSchema.getDataSchema().getDataSource(); } public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() From be92a1d4121bbb9c6d94bfdaf5f6343fe9d2925f Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 19 Nov 2019 22:09:14 -0800 Subject: [PATCH 12/12] remove nullable annotation --- .../druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java | 1 - .../druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java | 1 - 2 files changed, 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index a47f2c204006..5d1f4bfad85b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -129,7 +129,6 @@ public KafkaSupervisorIOConfig getIoConfig() } @Override - @Nullable @JsonProperty public KafkaSupervisorIngestionSpec getSpec() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index b07d70f78362..e9610c609703 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -148,7 +148,6 @@ public KinesisSupervisorIOConfig getIoConfig() } @Override - @Nullable @JsonProperty public KinesisSupervisorIngestionSpec getSpec() {