From 01f054a4bfd86ce23ea4f3a0bafcb0b877bc83a4 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 14 Jul 2020 14:03:25 -0700 Subject: [PATCH 01/20] append test --- .../indexer/AbstractITBatchIndexTest.java | 44 ++-- ...ractLocalInputSourceParallelIndexTest.java | 10 + .../tests/indexer/ITAppendBatchIndexTest.java | 229 ++++++++++++++++++ .../parallelized/ITSqlInputSourceTest.java | 2 +- .../indexer/wikipedia_compaction_task.json | 8 +- ...kipedia_local_input_source_index_task.json | 2 + 6 files changed, 273 insertions(+), 22 deletions(-) create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 993ee6bac341..5b55b4e9ba7b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -127,29 +127,33 @@ protected void doIndexTest( submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad); if (runTestQueries) { - try { - - String queryResponseTemplate; - try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - fullDatasourceName - ); - queryHelper.testQueriesFromString(queryResponseTemplate, 2); + doTestQuery(fullDatasourceName, queryFilePath, 2); + } + } + protected void doTestQuery(String fullDatasourceName, String queryFilePath, int timesToRun) + { + try { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw new RuntimeException(e); + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + queryHelper.testQueriesFromString(queryResponseTemplate, timesToRun); + + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw new RuntimeException(e); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java index a00bb3d83d47..82578d1e3625 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractLocalInputSourceParallelIndexTest.java @@ -70,6 +70,16 @@ public void doIndexTest(InputFormatDetails inputFormatDetails, @Nonnull Map partitionsSpecList) throws Exception + { + final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + // Submit initial ingestion task + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false, INDEX_QUERIES_PRE_APPEND_RESOURCE); + // Submit append ingestion task + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true, INDEX_QUERIES_POST_APPEND_RESOURCE); + // Submit compaction task + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(indexDatasource); + intervalsBeforeCompaction.sort(null); + compactData(indexDatasource); + // Verification post compaction + checkCompactionIntervals(indexDatasource, intervalsBeforeCompaction); + verifySegmentsCount(indexDatasource, 10); + verifySegmentsCompacted(indexDatasource, 10, 1000); + doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_RESOURCE, 2); + } + } + + private void submitIngestionTaskAndVerify( + String indexDatasource, + PartitionsSpec partitionsSpec, + boolean appendToExisting, + String indexQueriesResource + ) throws Exception + { + InputFormatDetails inputFormatDetails = InputFormatDetails.JSON; + Map inputFormatMap = new ImmutableMap.Builder().put("type", inputFormatDetails.getInputFormatType()) + .build(); + final Function sqlInputSourcePropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(partitionsSpec) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_FILTER%%", + "*" + inputFormatDetails.getFileExtension() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_BASE_DIR%%", + "/resources/data/batch_index" + inputFormatDetails.getFolderSuffix() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT%%", + jsonMapper.writeValueAsString(inputFormatMap) + ); + spec = StringUtils.replace( + spec, + "%%APPEND_TO_EXISTING%%", + jsonMapper.writeValueAsString(appendToExisting) + ); + if (partitionsSpec instanceof DynamicPartitionsSpec) { + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(false) + ); + } else if (partitionsSpec instanceof HashedPartitionsSpec || partitionsSpec instanceof SingleDimensionPartitionsSpec) { + spec = StringUtils.replace( + spec, + "%%FORCE_GUARANTEED_ROLLUP%%", + jsonMapper.writeValueAsString(true) + ); + } + return spec; + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + sqlInputSourcePropsTransform, + indexQueriesResource, + false, + true, + true + ); + } + + private void compactData(String fullDatasourceName) throws Exception + { + final String template = getResourceAsString(COMPACTION_TASK); + String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + + final String taskID = indexer.submitTask(taskSpec); + LOG.info("TaskID for compaction task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); + } + + private void verifySegmentsCount(String fullDatasourceName, int numExpectedSegments) + { + ITRetryUtil.retryUntilTrue( + () -> { + int metadataSegmentCount = coordinator.getSegments(fullDatasourceName).size(); + LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); + return metadataSegmentCount == numExpectedSegments; + }, + "Compaction segment count check" + ); + } + + private void checkCompactionIntervals(String fullDatasourceName, List expectedIntervals) + { + ITRetryUtil.retryUntilTrue( + () -> { + final List actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName); + actualIntervals.sort(null); + return actualIntervals.equals(expectedIntervals); + }, + "Compaction interval check" + ); + } + + private void verifySegmentsCompacted(String fullDatasourceName, int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment) + { + List segments = coordinator.getFullSegmentsMetadata(fullDatasourceName); + List foundCompactedSegments = new ArrayList<>(); + for (DataSegment segment : segments) { + if (segment.getLastCompactionState() != null) { + foundCompactedSegments.add(segment); + } + } + Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount); + for (DataSegment compactedSegment : foundCompactedSegments) { + Assert.assertNotNull(compactedSegment.getLastCompactionState()); + Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getMaxRowsPerSegment(), + expectedMaxRowsPerSegment); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), + SecondaryPartitionType.LINEAR + ); + } + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java index 4efe83afcf2f..015bafe1584b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java @@ -38,7 +38,7 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITSqlInputSourceTest extends AbstractITBatchIndexTest { - private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; + private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; @DataProvider(parallel = true) diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index 8bb0ab91efbd..ca02e92fa9aa 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -1,5 +1,11 @@ { "type" : "compact", "dataSource" : "%%DATASOURCE%%", - "interval" : "2013-08-31/2013-09-02" + "ioConfig" : { + "type": "compact", + "inputSpec": { + "type": "interval", + "interval": "2013-08-31/2013-09-02" + } + } } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json index 67650f8e1be5..9d870cdc3251 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json @@ -71,6 +71,7 @@ "filter" : "%%INPUT_SOURCE_FILTER%%", "baseDir": "%%INPUT_SOURCE_BASE_DIR%%" }, + "appendToExisting": %%APPEND_TO_EXISTING%%, "inputFormat": %%INPUT_FORMAT%% }, "tuningConfig": { @@ -80,6 +81,7 @@ "type": "maxSize", "maxSplitSize": 1 }, + "forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%, "partitionsSpec": %%PARTITIONS_SPEC%% } } From f8c8df8f9f0ac9dc4ee31ac02a3d40b635aca56f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 14 Jul 2020 17:37:53 -0700 Subject: [PATCH 02/20] add append IT --- .../tests/indexer/ITAppendBatchIndexTest.java | 57 ++++--- .../indexer/wikipedia_compaction_task.json | 3 + ...dia_double_with_roll_up_index_queries.json | 143 ++++++++++++++++++ ..._double_without_roll_up_index_queries.json | 143 ++++++++++++++++++ 4 files changed, 321 insertions(+), 25 deletions(-) create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index ab8a09da21ed..0b5b748c6cbc 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -50,24 +50,26 @@ public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest { private static final Logger LOG = new Logger(ITAppendBatchIndexTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json"; - private static final String INDEX_QUERIES_PRE_APPEND_RESOURCE = "/indexer/wikipedia_index_queries.json"; - private static final String INDEX_QUERIES_POST_APPEND_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_QUERIES_INITIAL_INGESTION__RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_without_roll_up_index_queries.json"; + private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_with_roll_up_index_queries.json"; + private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; @DataProvider(parallel = true) public static Object[][] resources() { return new Object[][]{ -// // First index with dynamically-partitioned then append dynamically-partitioned -// {ImmutableList.of( -// new DynamicPartitionsSpec(null, null), -// new DynamicPartitionsSpec(null, null) -// )}, -// // First index with hash-partitioned then append dynamically-partitioned -// {ImmutableList.of( -// new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user")), -// new DynamicPartitionsSpec(null, null) -// )}, + // First index with dynamically-partitioned then append dynamically-partitioned + {ImmutableList.of( + new DynamicPartitionsSpec(null, null), + new DynamicPartitionsSpec(null, null) + )}, + // First index with hash-partitioned then append dynamically-partitioned + {ImmutableList.of( + new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user")), + new DynamicPartitionsSpec(null, null) + )}, // First index with range-partitioned then append dynamically-partitioned {ImmutableList.of( new SingleDimensionPartitionsSpec(1000, null, "page", false), @@ -84,26 +86,29 @@ public void doIndexTest(List partitionsSpecList) throws Exceptio final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); ) { // Submit initial ingestion task - submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false, INDEX_QUERIES_PRE_APPEND_RESOURCE); + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false); + verifySegmentsCountAndLoaded(indexDatasource, 2); + doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION__RESOURCE, 2); // Submit append ingestion task - submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true, INDEX_QUERIES_POST_APPEND_RESOURCE); + submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true); + verifySegmentsCountAndLoaded(indexDatasource, 6); + doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE, 2); // Submit compaction task final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(indexDatasource); intervalsBeforeCompaction.sort(null); compactData(indexDatasource); // Verification post compaction checkCompactionIntervals(indexDatasource, intervalsBeforeCompaction); - verifySegmentsCount(indexDatasource, 10); - verifySegmentsCompacted(indexDatasource, 10, 1000); - doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_RESOURCE, 2); + verifySegmentsCountAndLoaded(indexDatasource, 2); + verifySegmentsCompacted(indexDatasource, 2); + doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE, 2); } } private void submitIngestionTaskAndVerify( String indexDatasource, PartitionsSpec partitionsSpec, - boolean appendToExisting, - String indexQueriesResource + boolean appendToExisting ) throws Exception { InputFormatDetails inputFormatDetails = InputFormatDetails.JSON; @@ -160,9 +165,9 @@ private void submitIngestionTaskAndVerify( indexDatasource, INDEX_TASK, sqlInputSourcePropsTransform, - indexQueriesResource, + null, + false, false, - true, true ); } @@ -182,7 +187,7 @@ private void compactData(String fullDatasourceName) throws Exception ); } - private void verifySegmentsCount(String fullDatasourceName, int numExpectedSegments) + private void verifySegmentsCountAndLoaded(String fullDatasourceName, int numExpectedSegments) { ITRetryUtil.retryUntilTrue( () -> { @@ -192,6 +197,10 @@ private void verifySegmentsCount(String fullDatasourceName, int numExpectedSegme }, "Compaction segment count check" ); + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Load" + ); } private void checkCompactionIntervals(String fullDatasourceName, List expectedIntervals) @@ -206,7 +215,7 @@ private void checkCompactionIntervals(String fullDatasourceName, List ex ); } - private void verifySegmentsCompacted(String fullDatasourceName, int expectedCompactedSegmentCount, Integer expectedMaxRowsPerSegment) + private void verifySegmentsCompacted(String fullDatasourceName, int expectedCompactedSegmentCount) { List segments = coordinator.getFullSegmentsMetadata(fullDatasourceName); List foundCompactedSegments = new ArrayList<>(); @@ -219,8 +228,6 @@ private void verifySegmentsCompacted(String fullDatasourceName, int expectedComp for (DataSegment compactedSegment : foundCompactedSegments) { Assert.assertNotNull(compactedSegment.getLastCompactionState()); Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); - Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getMaxRowsPerSegment(), - expectedMaxRowsPerSegment); Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), SecondaryPartitionType.LINEAR ); diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index ca02e92fa9aa..fb620c11aa20 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -7,5 +7,8 @@ "type": "interval", "interval": "2013-08-31/2013-09-02" } + }, + "context" : { + "storeCompactionState" : true } } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json new file mode 100644 index 000000000000..eaa9592ca266 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json @@ -0,0 +1,143 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":10, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 18100.0, + "page" : "Crimson Typhoon", + "added_count" : 1810, + "rows" : 1 + } + } ] + }, + { + "description": "timeseries, count aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "count", + "name": "rows" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "rows":5 + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json new file mode 100644 index 000000000000..586da63e3dbd --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json @@ -0,0 +1,143 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":10, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 18100.0, + "page" : "Crimson Typhoon", + "added_count" : 1810, + "rows" : 2 + } + } ] + }, + { + "description": "timeseries, count aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "count", + "name": "rows" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "rows":10 + } + } + ] + } +] \ No newline at end of file From 9f30c98792852d0a32df7b7b4861d8a9f5a69111 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 14 Jul 2020 17:39:55 -0700 Subject: [PATCH 03/20] fix checkstyle --- .../apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 0b5b748c6cbc..2505b96e1de1 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -50,7 +50,7 @@ public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest { private static final Logger LOG = new Logger(ITAppendBatchIndexTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json"; - private static final String INDEX_QUERIES_INITIAL_INGESTION__RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INDEX_QUERIES_INITIAL_INGESTION_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_without_roll_up_index_queries.json"; private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_with_roll_up_index_queries.json"; @@ -88,7 +88,7 @@ public void doIndexTest(List partitionsSpecList) throws Exceptio // Submit initial ingestion task submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false); verifySegmentsCountAndLoaded(indexDatasource, 2); - doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION__RESOURCE, 2); + doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION_RESOURCE, 2); // Submit append ingestion task submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true); verifySegmentsCountAndLoaded(indexDatasource, 6); From ed74f8388bb58efad9fc156a08f71d6d847cdfa0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 14 Jul 2020 22:00:01 -0700 Subject: [PATCH 04/20] fix checkstyle --- .../tests/indexer/ITAppendBatchIndexTest.java | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 2505b96e1de1..b565e31edfa4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -61,25 +61,34 @@ public static Object[][] resources() { return new Object[][]{ // First index with dynamically-partitioned then append dynamically-partitioned - {ImmutableList.of( + { + ImmutableList.of( new DynamicPartitionsSpec(null, null), new DynamicPartitionsSpec(null, null) - )}, + ), + ImmutableList.of(4, 8, 2) + }, // First index with hash-partitioned then append dynamically-partitioned - {ImmutableList.of( + { + ImmutableList.of( new HashedPartitionsSpec(null, 3, ImmutableList.of("page", "user")), new DynamicPartitionsSpec(null, null) - )}, + ), + ImmutableList.of(6, 10, 2) + }, // First index with range-partitioned then append dynamically-partitioned - {ImmutableList.of( + { + ImmutableList.of( new SingleDimensionPartitionsSpec(1000, null, "page", false), new DynamicPartitionsSpec(null, null) - )} + ), + ImmutableList.of(2, 6, 2) + } }; } @Test(dataProvider = "resources") - public void doIndexTest(List partitionsSpecList) throws Exception + public void doIndexTest(List partitionsSpecList, List expectedSegmentCountList) throws Exception { final String indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); try ( @@ -87,11 +96,11 @@ public void doIndexTest(List partitionsSpecList) throws Exceptio ) { // Submit initial ingestion task submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(0), false); - verifySegmentsCountAndLoaded(indexDatasource, 2); + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(0)); doTestQuery(indexDatasource, INDEX_QUERIES_INITIAL_INGESTION_RESOURCE, 2); // Submit append ingestion task submitIngestionTaskAndVerify(indexDatasource, partitionsSpecList.get(1), true); - verifySegmentsCountAndLoaded(indexDatasource, 6); + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(1)); doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE, 2); // Submit compaction task final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(indexDatasource); @@ -99,8 +108,8 @@ public void doIndexTest(List partitionsSpecList) throws Exceptio compactData(indexDatasource); // Verification post compaction checkCompactionIntervals(indexDatasource, intervalsBeforeCompaction); - verifySegmentsCountAndLoaded(indexDatasource, 2); - verifySegmentsCompacted(indexDatasource, 2); + verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(2)); + verifySegmentsCompacted(indexDatasource, expectedSegmentCountList.get(2)); doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE, 2); } } @@ -195,11 +204,11 @@ private void verifySegmentsCountAndLoaded(String fullDatasourceName, int numExpe LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); return metadataSegmentCount == numExpectedSegments; }, - "Compaction segment count check" + "Segment count check" ); ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), - "Segment Load" + "Segment load check" ); } From f3c7ead602bea2e276119e7bf9d46d6f608457aa Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jul 2020 17:53:26 -0700 Subject: [PATCH 05/20] Remove parallel --- .../org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index b565e31edfa4..e9727a954abe 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -56,7 +56,6 @@ public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; - @DataProvider(parallel = true) public static Object[][] resources() { return new Object[][]{ From bdf7659467e7b88afb5c6f8034fd63221a3deffe Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jul 2020 17:55:09 -0700 Subject: [PATCH 06/20] fix checkstyle --- .../org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index e9727a954abe..66ad7bf98535 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -33,7 +33,6 @@ import org.apache.druid.tests.TestNGGroup; import org.apache.druid.timeline.DataSegment; import org.testng.Assert; -import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; From ba4f75a6f09cb2e5b57efdd3ab25abc3200d8717 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 16 Jul 2020 23:32:46 -0700 Subject: [PATCH 07/20] fix --- .../org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 66ad7bf98535..2fc93f436c3e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -33,6 +33,7 @@ import org.apache.druid.tests.TestNGGroup; import org.apache.druid.timeline.DataSegment; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -55,6 +56,7 @@ public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + @DataProvider() public static Object[][] resources() { return new Object[][]{ From fc878da5e433ce05efdbcf9bfd1b9f60575d763e Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 01:37:43 -0700 Subject: [PATCH 08/20] fix --- .../org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 2fc93f436c3e..321d493acc95 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -56,7 +56,7 @@ public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; - @DataProvider() + @DataProvider public static Object[][] resources() { return new Object[][]{ From 28543b3a9c91ea1f78a127ae04a8e4594643def8 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 11:43:26 -0700 Subject: [PATCH 09/20] address comments --- .travis.yml | 17 +++++++++++++++-- .../clients/CoordinatorResourceTestClient.java | 10 +++++----- .../org/apache/druid/tests/TestNGGroup.java | 2 ++ .../tests/indexer/ITAppendBatchIndexTest.java | 12 +++++++++--- .../parallelized/ITSqlInputSourceTest.java | 2 +- ...stion_non_perfect_rollup_index_queries.json} | 0 ...ingestion_perfect_rollup_index_queries.json} | 0 7 files changed, 32 insertions(+), 11 deletions(-) rename integration-tests/src/test/resources/indexer/{wikipedia_double_without_roll_up_index_queries.json => wikipedia_double_ingestion_non_perfect_rollup_index_queries.json} (100%) rename integration-tests/src/test/resources/indexer/{wikipedia_double_with_roll_up_index_queries.json => wikipedia_double_ingestion_perfect_rollup_index_queries.json} (100%) diff --git a/.travis.yml b/.travis.yml index af0192d0efff..f3b42c48bf3c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -393,11 +393,19 @@ jobs: script: *run_integration_test after_failure: *integration_test_diags + - &integration_append_ingestion + name: "(Compile=openjdk8, Run=openjdk8) append ingestion integration test" + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=8' + script: *run_integration_test + after_failure: *integration_test_diags + - &integration_tests name: "(Compile=openjdk8, Run=openjdk8) other integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' script: *run_integration_test after_failure: *integration_test_diags # END - Integration tests for Compile with Java 8 and Run with Java 8 @@ -433,10 +441,15 @@ jobs: jdk: openjdk8 env: TESTNG_GROUPS='-Dgroups=realtime-index' JVM_RUNTIME='-Djvm.runtime=11' + - <<: *integration_append_ingestion + name: "(Compile=openjdk8, Run=openjdk11) append ingestion integration test" + jdk: openjdk8 + env: TESTNG_GROUPS='-Dgroups=append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' + - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion' JVM_RUNTIME='-Djvm.runtime=11' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index b369ac34b7a9..6e98cf783cae 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -95,9 +95,9 @@ private String getFullSegmentsURL(String dataSource) return StringUtils.format("%sdatasources/%s/segments?full", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } - private String getLoadStatusURL() + private String getLoadStatusURL(String dataSource) { - return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus"); + return StringUtils.format("%sdatasources/%s/loadstatus?forceMetadataRefresh=true&interval=1970-01-01/2999-01-01", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } /** return a list of the segment dates for the specified data source */ @@ -173,11 +173,11 @@ public List getAvailableSegments(final String dataSource) } } - private Map getLoadStatus() + private Map getLoadStatus(String dataSorce) { Map status; try { - StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL()); + StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL(dataSorce)); status = jsonMapper.readValue( response.getContent(), new TypeReference>() @@ -201,7 +201,7 @@ private Map getLoadStatus() */ public boolean areSegmentsLoaded(String dataSource) { - final Map status = getLoadStatus(); + final Map status = getLoadStatus(dataSource); return (status.containsKey(dataSource) && status.get(dataSource) == 100.0); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index b76b035d01df..ead9f1337b38 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -41,6 +41,8 @@ public class TestNGGroup public static final String OTHER_INDEX = "other-index"; + public static final String APPEND_INGESTION = "append-ingestion"; + public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; /** diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 321d493acc95..f129aa775dfa 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -44,15 +44,21 @@ import java.util.UUID; import java.util.function.Function; -@Test(groups = {TestNGGroup.OTHER_INDEX}) +@Test(groups = {TestNGGroup.APPEND_INGESTION}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest { private static final Logger LOG = new Logger(ITAppendBatchIndexTest.class); private static final String INDEX_TASK = "/indexer/wikipedia_local_input_source_index_task.json"; + // This query file is for the initial ingestion which is one complete dataset with roll up private static final String INDEX_QUERIES_INITIAL_INGESTION_RESOURCE = "/indexer/wikipedia_index_queries.json"; - private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_without_roll_up_index_queries.json"; - private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_with_roll_up_index_queries.json"; + // This query file is for the initial ingestion plus the append ingestion which are two complete dataset with roll + // up within each dataset (roll up within the initial ingestion and roll up within the append ingestion but not + // roll up across both dataset). + private static final String INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json"; + // This query file is for the initial ingestion plus the append ingestion plus a compaction task after the two ingestions. + // This is two complete dataset with perfect roll up across both dataset. + private static final String INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE = "/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json"; private static final String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java index 015bafe1584b..4efe83afcf2f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITSqlInputSourceTest.java @@ -38,7 +38,7 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITSqlInputSourceTest extends AbstractITBatchIndexTest { - private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_using_sqlinputsource_task.json"; + private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; @DataProvider(parallel = true) diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json similarity index 100% rename from integration-tests/src/test/resources/indexer/wikipedia_double_without_roll_up_index_queries.json rename to integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_non_perfect_rollup_index_queries.json diff --git a/integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json similarity index 100% rename from integration-tests/src/test/resources/indexer/wikipedia_double_with_roll_up_index_queries.json rename to integration-tests/src/test/resources/indexer/wikipedia_double_ingestion_perfect_rollup_index_queries.json From 0ef2b8eb23bca0564f25d89949cc9532b3fe292a Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 13:54:47 -0700 Subject: [PATCH 10/20] fix --- .../CoordinatorResourceTestClient.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index 6e98cf783cae..8589f5b72942 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -175,9 +175,25 @@ public List getAvailableSegments(final String dataSource) private Map getLoadStatus(String dataSorce) { + String url = getLoadStatusURL(dataSorce); Map status; try { - StatusResponseHolder response = makeRequest(HttpMethod.GET, getLoadStatusURL(dataSorce)); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), + responseHandler + ).get(); + + if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) { + return null; + } + if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) { + throw new ISE( + "Error while making request to url[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } status = jsonMapper.readValue( response.getContent(), new TypeReference>() @@ -191,18 +207,10 @@ private Map getLoadStatus(String dataSorce) return status; } - /** - * Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which - * caches segments in memory and periodically updates them. Hence, there can be a race condition as - * this API implementation compares segments metadata from cache with segments in historicals. - * Particularly, when number of segment changes after the first initial load of the datasource. - * Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments} - * before calling this method (since, that would wait until the cache is updated with expected data) - */ public boolean areSegmentsLoaded(String dataSource) { final Map status = getLoadStatus(dataSource); - return (status.containsKey(dataSource) && status.get(dataSource) == 100.0); + return (status != null && status.containsKey(dataSource) && status.get(dataSource) == 100.0); } public void unloadSegmentsForDataSource(String dataSource) From 56413b1a01b453aa059e518d4a914c6062c7a6b4 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 13:55:20 -0700 Subject: [PATCH 11/20] fix --- .../druid/testing/clients/CoordinatorResourceTestClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index 8589f5b72942..c0e04b9ab21b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.metadata.SqlSegmentsMetadataManager; import org.apache.druid.query.lookup.LookupsState; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer; From 07c8c40e429f90c1f578b960abb5f7953298c7c2 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 16:33:31 -0700 Subject: [PATCH 12/20] fix --- .../indexer/AbstractITBatchIndexTest.java | 55 +++++++++++++++++++ .../tests/indexer/ITAppendBatchIndexTest.java | 47 ---------------- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 5b55b4e9ba7b..814e1dabaead 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import org.apache.commons.io.IOUtils; +import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexing.common.task.batch.parallel.PartialDimensionDistributionTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialGenericSegmentMergeTask; import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask; @@ -43,6 +44,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -325,4 +327,57 @@ private long countCompleteSubTasks(final String dataSource, final boolean perfec }) .count(); } + + void verifySegmentsCountAndLoaded(String dataSource, int numExpectedSegments) + { + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(dataSource + config.getExtraDatasourceNameSuffix()), + "Segment load check" + ); + ITRetryUtil.retryUntilTrue( + () -> { + int metadataSegmentCount = coordinator.getSegments( + dataSource + config.getExtraDatasourceNameSuffix() + ).size(); + LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); + return metadataSegmentCount == numExpectedSegments; + }, + "Segment count check" + ); + } + + void checkCompactionIntervals(String dataSource, List expectedIntervals) + { + ITRetryUtil.retryUntilTrue( + () -> { + final List actualIntervals = coordinator.getSegmentIntervals( + dataSource + config.getExtraDatasourceNameSuffix() + ); + actualIntervals.sort(null); + return actualIntervals.equals(expectedIntervals); + }, + "Compaction interval check" + ); + } + + void verifySegmentsCompacted(String dataSource, int expectedCompactedSegmentCount) + { + List segments = coordinator.getFullSegmentsMetadata( + dataSource + config.getExtraDatasourceNameSuffix() + ); + List foundCompactedSegments = new ArrayList<>(); + for (DataSegment segment : segments) { + if (segment.getLastCompactionState() != null) { + foundCompactedSegments.add(segment); + } + } + Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount); + for (DataSegment compactedSegment : foundCompactedSegments) { + Assert.assertNotNull(compactedSegment.getLastCompactionState()); + Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); + Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), + SecondaryPartitionType.LINEAR + ); + } + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index f129aa775dfa..515a0310e4b5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -201,51 +201,4 @@ private void compactData(String fullDatasourceName) throws Exception "Segment Compaction" ); } - - private void verifySegmentsCountAndLoaded(String fullDatasourceName, int numExpectedSegments) - { - ITRetryUtil.retryUntilTrue( - () -> { - int metadataSegmentCount = coordinator.getSegments(fullDatasourceName).size(); - LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); - return metadataSegmentCount == numExpectedSegments; - }, - "Segment count check" - ); - ITRetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(fullDatasourceName), - "Segment load check" - ); - } - - private void checkCompactionIntervals(String fullDatasourceName, List expectedIntervals) - { - ITRetryUtil.retryUntilTrue( - () -> { - final List actualIntervals = coordinator.getSegmentIntervals(fullDatasourceName); - actualIntervals.sort(null); - return actualIntervals.equals(expectedIntervals); - }, - "Compaction interval check" - ); - } - - private void verifySegmentsCompacted(String fullDatasourceName, int expectedCompactedSegmentCount) - { - List segments = coordinator.getFullSegmentsMetadata(fullDatasourceName); - List foundCompactedSegments = new ArrayList<>(); - for (DataSegment segment : segments) { - if (segment.getLastCompactionState() != null) { - foundCompactedSegments.add(segment); - } - } - Assert.assertEquals(foundCompactedSegments.size(), expectedCompactedSegmentCount); - for (DataSegment compactedSegment : foundCompactedSegments) { - Assert.assertNotNull(compactedSegment.getLastCompactionState()); - Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); - Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec().getType(), - SecondaryPartitionType.LINEAR - ); - } - } } From f00e70261e1c93b7e910637a0bcc5658fbc72efd Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 16:35:28 -0700 Subject: [PATCH 13/20] fix --- .../apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 515a0310e4b5..dcfcb8f73709 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -24,21 +24,17 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; -import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; -import org.apache.druid.timeline.DataSegment; -import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.Closeable; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; From 52aa434c4ebe113f6dd618e211df4bbd5e58b566 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 18:08:13 -0700 Subject: [PATCH 14/20] fix --- .../druid/tests/indexer/AbstractITBatchIndexTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 814e1dabaead..d294c6a3e9ef 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -129,11 +129,11 @@ protected void doIndexTest( submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion, waitForSegmentsToLoad); if (runTestQueries) { - doTestQuery(fullDatasourceName, queryFilePath, 2); + doTestQuery(dataSource, queryFilePath, 2); } } - protected void doTestQuery(String fullDatasourceName, String queryFilePath, int timesToRun) + protected void doTestQuery(String dataSource, String queryFilePath, int timesToRun) { try { String queryResponseTemplate; @@ -148,7 +148,7 @@ protected void doTestQuery(String fullDatasourceName, String queryFilePath, int queryResponseTemplate = StringUtils.replace( queryResponseTemplate, "%%DATASOURCE%%", - fullDatasourceName + dataSource + config.getExtraDatasourceNameSuffix() ); queryHelper.testQueriesFromString(queryResponseTemplate, timesToRun); From 6cee40e039da3b22055d10444908fc149897f109 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 21:33:21 -0700 Subject: [PATCH 15/20] fix --- .../duty/ITAutoCompactionTest.java | 26 +++++++++---------- .../indexer/AbstractITBatchIndexTest.java | 18 +++++++++++-- .../tests/indexer/ITAppendBatchIndexTest.java | 20 +------------- 3 files changed, 29 insertions(+), 35 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 3618d84a5a02..53c5fb47d469 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -87,15 +87,15 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception verifyQuery(INDEX_QUERIES_RESOURCE); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1)); - //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total) - forceTriggerAutoCompaction(5); + //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (3 total) + forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); - //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total) - forceTriggerAutoCompaction(6); + //...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (2 total) + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -119,8 +119,8 @@ public void testAutoCompactionDutyCanUpdateCompactionConfig() throws Exception submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST); // Instead of merging segments, the updated config will split segments! - //...compacted into 10 new segments across 2 days. 5 new segments each day (14 total) - forceTriggerAutoCompaction(14); + //...compacted into 10 new segments across 2 days. 5 new segments each day (10 total) + forceTriggerAutoCompaction(10); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(10, 1); @@ -162,11 +162,9 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); - // Skips first day. Should only compact one out of two days. - submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); - // Set compactionTaskSlotRatio to 0 to prevent any compaction updateCompactionTaskSlot(0, 100); + submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); // ...should remains unchanged (4 total) forceTriggerAutoCompaction(4); verifyQuery(INDEX_QUERIES_RESOURCE); @@ -183,15 +181,15 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception // Update compaction slots to be 1 updateCompactionTaskSlot(1, 1); - // One day compacted (1 new segment) and one day remains uncompacted. (5 total) - forceTriggerAutoCompaction(5); + // One day compacted (1 new segment) and one day remains uncompacted. (3 total) + forceTriggerAutoCompaction(3); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312"); // Run compaction again to compact the remaining day - // Remaining day compacted (1 new segment). Now both days compacted (6 total) - forceTriggerAutoCompaction(6); + // Remaining day compacted (1 new segment). Now both days compacted (2 total) + forceTriggerAutoCompaction(2); verifyQuery(INDEX_QUERIES_RESOURCE); verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED); checkCompactionIntervals(intervalsBeforeCompaction); @@ -283,11 +281,11 @@ private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) { compactionResource.forceTriggerAutoCompaction(); waitForAllTasksToCompleteForDataSource(fullDatasourceName); - verifySegmentsCount(numExpectedSegmentsAfterCompaction); ITRetryUtil.retryUntilTrue( () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction" ); + verifySegmentsCount(numExpectedSegmentsAfterCompaction); } private void verifySegmentsCount(int numExpectedSegments) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index d294c6a3e9ef..222d7e727a4a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -346,15 +346,29 @@ void verifySegmentsCountAndLoaded(String dataSource, int numExpectedSegments) ); } - void checkCompactionIntervals(String dataSource, List expectedIntervals) + void compactData(String dataSource, String compactionTask) throws Exception { + final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); + intervalsBeforeCompaction.sort(null); + final String template = getResourceAsString(compactionTask); + String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); + + final String taskID = indexer.submitTask(taskSpec); + LOG.info("TaskID for compaction task %s", taskID); + indexer.waitUntilTaskCompletes(taskID); + + ITRetryUtil.retryUntilTrue( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + "Segment Compaction" + ); ITRetryUtil.retryUntilTrue( () -> { final List actualIntervals = coordinator.getSegmentIntervals( dataSource + config.getExtraDatasourceNameSuffix() ); actualIntervals.sort(null); - return actualIntervals.equals(expectedIntervals); + return actualIntervals.equals(intervalsBeforeCompaction); }, "Compaction interval check" ); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index dcfcb8f73709..321c72843c2c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -105,11 +105,8 @@ public void doIndexTest(List partitionsSpecList, List e verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(1)); doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_PRE_COMPACT_RESOURCE, 2); // Submit compaction task - final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(indexDatasource); - intervalsBeforeCompaction.sort(null); - compactData(indexDatasource); + compactData(indexDatasource, COMPACTION_TASK); // Verification post compaction - checkCompactionIntervals(indexDatasource, intervalsBeforeCompaction); verifySegmentsCountAndLoaded(indexDatasource, expectedSegmentCountList.get(2)); verifySegmentsCompacted(indexDatasource, expectedSegmentCountList.get(2)); doTestQuery(indexDatasource, INDEX_QUERIES_POST_APPEND_POST_COMPACT_RESOURCE, 2); @@ -182,19 +179,4 @@ private void submitIngestionTaskAndVerify( true ); } - - private void compactData(String fullDatasourceName) throws Exception - { - final String template = getResourceAsString(COMPACTION_TASK); - String taskSpec = StringUtils.replace(template, "%%DATASOURCE%%", fullDatasourceName); - - final String taskID = indexer.submitTask(taskSpec); - LOG.info("TaskID for compaction task %s", taskID); - indexer.waitUntilTaskCompletes(taskID); - - ITRetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(fullDatasourceName), - "Segment Compaction" - ); - } } From 296942de34cd9db851e904d8f6d19def1490e324 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 17 Jul 2020 21:34:00 -0700 Subject: [PATCH 16/20] fix --- .../org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 321c72843c2c..94e4d23f75ce 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.tests.TestNGGroup; import org.testng.annotations.DataProvider; import org.testng.annotations.Guice; From a97ae58b5eb982791dd3f555a8bb794d7703c486 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sat, 18 Jul 2020 15:39:55 -0700 Subject: [PATCH 17/20] fix --- .../utils/DruidClusterAdminClient.java | 31 +++++++++++++++++++ .../duty/ITAutoCompactionTest.java | 4 +-- .../indexer/AbstractITBatchIndexTest.java | 6 ++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java index 4c6518d535bc..0f04cd1d659c 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -19,6 +19,7 @@ package org.apache.druid.testing.utils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.model.Container; import com.github.dockerjava.core.DockerClientBuilder; @@ -31,6 +32,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -51,15 +53,18 @@ public class DruidClusterAdminClient private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; + private final ObjectMapper jsonMapper; private final HttpClient httpClient; private IntegrationTestingConfig config; @Inject DruidClusterAdminClient( + ObjectMapper jsonMapper, @TestClient HttpClient httpClient, IntegrationTestingConfig config ) { + this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.config = config; } @@ -97,6 +102,7 @@ public void restartMiddleManagerContainer() public void waitUntilCoordinatorReady() { waitUntilInstanceReady(config.getCoordinatorUrl()); + postDynamicConfig(CoordinatorDynamicConfig.builder().withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(1).build()); } public void waitUntilHistoricalReady() @@ -159,4 +165,29 @@ private void waitUntilInstanceReady(final String host) "Waiting for instance to be ready: [" + host + "]" ); } + + private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig) + { + ITRetryUtil.retryUntilTrue( + () -> { + try { + String url = StringUtils.format("%sconfig", config.getCoordinatorUrl()); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(coordinatorDynamicConfig) + ), StatusResponseHandler.getInstance() + ).get(); + + LOG.info("%s %s", response.getStatus(), response.getContent()); + return response.getStatus().equals(HttpResponseStatus.OK); + } + catch (Throwable e) { + LOG.error(e, ""); + return false; + } + }, + "Posting dynamic config after startup" + ); + } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 53c5fb47d469..920a3110ff5a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -154,6 +154,8 @@ public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception @Test public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception { + // Set compactionTaskSlotRatio to 0 to prevent any compaction + updateCompactionTaskSlot(0, 100); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -162,8 +164,6 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception verifySegmentsCount(4); verifyQuery(INDEX_QUERIES_RESOURCE); - // Set compactionTaskSlotRatio to 0 to prevent any compaction - updateCompactionTaskSlot(0, 100); submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST); // ...should remains unchanged (4 total) forceTriggerAutoCompaction(4); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 222d7e727a4a..9a36015aed1f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -336,11 +336,11 @@ void verifySegmentsCountAndLoaded(String dataSource, int numExpectedSegments) ); ITRetryUtil.retryUntilTrue( () -> { - int metadataSegmentCount = coordinator.getSegments( + int segmentCount = coordinator.getAvailableSegments( dataSource + config.getExtraDatasourceNameSuffix() ).size(); - LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); - return metadataSegmentCount == numExpectedSegments; + LOG.info("Current segment count: %d, expected: %d", segmentCount, numExpectedSegments); + return segmentCount == numExpectedSegments; }, "Segment count check" ); From 7df13ed517a0c617a42891c3edf71d559d8a6cf4 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sat, 18 Jul 2020 21:59:57 -0700 Subject: [PATCH 18/20] fix --- .../org/apache/druid/testing/utils/DruidClusterAdminClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java index 0f04cd1d659c..b4143701277f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -171,7 +171,7 @@ private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig ITRetryUtil.retryUntilTrue( () -> { try { - String url = StringUtils.format("%sconfig", config.getCoordinatorUrl()); + String url = StringUtils.format("%s/config", config.getCoordinatorUrl()); StatusResponseHolder response = httpClient.go( new Request(HttpMethod.POST, new URL(url)).setContent( "application/json", From 51b1fe938d11ba66e845c1d3346021a715b32d0c Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sun, 19 Jul 2020 11:02:54 -0700 Subject: [PATCH 19/20] fix --- .../org/apache/druid/testing/utils/DruidClusterAdminClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java index b4143701277f..d3bbd1f0cfc0 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -171,7 +171,7 @@ private void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig ITRetryUtil.retryUntilTrue( () -> { try { - String url = StringUtils.format("%s/config", config.getCoordinatorUrl()); + String url = StringUtils.format("%s/druid/coordinator/v1/config", config.getCoordinatorUrl()); StatusResponseHolder response = httpClient.go( new Request(HttpMethod.POST, new URL(url)).setContent( "application/json", From 8e3b8fca8bb9b0085acc251254663f48e3a6447c Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Sun, 19 Jul 2020 13:31:19 -0700 Subject: [PATCH 20/20] fix --- .../tests/coordinator/duty/ITAutoCompactionTest.java | 10 +--------- .../wikipedia_local_input_source_index_task.json | 2 +- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 920a3110ff5a..f123a09efcf6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -155,7 +155,7 @@ public void testAutoCompactionDutyCanDeleteCompactionConfig() throws Exception public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception { // Set compactionTaskSlotRatio to 0 to prevent any compaction - updateCompactionTaskSlot(0, 100); + updateCompactionTaskSlot(0, 0); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); @@ -171,14 +171,6 @@ public void testAutoCompactionDutyCanUpdateTaskSlots() throws Exception verifySegmentsCompacted(0, null); checkCompactionIntervals(intervalsBeforeCompaction); - // Set maxCompactionTaskSlots to 0 to prevent any compaction - updateCompactionTaskSlot(0.1, 0); - // ...should remains unchanged (4 total) - forceTriggerAutoCompaction(4); - verifyQuery(INDEX_QUERIES_RESOURCE); - verifySegmentsCompacted(0, null); - checkCompactionIntervals(intervalsBeforeCompaction); - // Update compaction slots to be 1 updateCompactionTaskSlot(1, 1); // One day compacted (1 new segment) and one day remains uncompacted. (3 total) diff --git a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json index 9d870cdc3251..a533a848e337 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_local_input_source_index_task.json @@ -76,7 +76,7 @@ }, "tuningConfig": { "type": "index_parallel", - "maxNumConcurrentSubTasks": 10, + "maxNumConcurrentSubTasks": 4, "splitHintSpec": { "type": "maxSize", "maxSplitSize": 1