From 488e15062e6eff0ba6ccf643721064fb68079232 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 7 Mar 2019 17:29:21 -0800 Subject: [PATCH 1/2] integration-tests: make ITParallelIndexTest still work in parallel Follow-up to #7181, which made the default behavior for index_parallel tasks non-parallel. --- .../test/resources/indexer/wikipedia_parallel_index_task.json | 4 ++++ .../resources/indexer/wikipedia_parallel_reindex_task.json | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json index f317c538f6b6..887508ad7e97 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json @@ -61,6 +61,10 @@ "baseDir": "/resources/data/batch_index", "filter": "wikipedia_index_data*" } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 } } } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json index c06890bfde4e..ef16c648cb88 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json @@ -60,6 +60,10 @@ "baseDir": "/resources/data/batch_index", "filter": "wikipedia_index_data2*" } + }, + "tuningConfig": { + "type": "index_parallel", + "maxNumSubTasks": 10 } } } \ No newline at end of file From a42503291c16d9d1b264455aa93f24de454ae2d6 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 7 Mar 2019 21:37:52 -0800 Subject: [PATCH 2/2] Validate that parallel index subtasks were run --- .../clients/OverlordResourceTestClient.java | 14 ++++++++++- .../testing/clients/TaskResponseObject.java | 9 ++++++++ .../indexer/AbstractITBatchIndexTest.java | 23 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 5cee2fb28866..f2e600e42646 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -153,6 +153,11 @@ public List getPendingTasks() return getTasks("pendingTasks"); } + public List getCompleteTasksForDataSource(final String dataSource) + { + return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource))); + } + private List getTasks(String identifier) { try { @@ -233,7 +238,14 @@ public void shutdownSupervisor(String id) { try { StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))), + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/shutdown", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), responseHandler ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java index 501cd5e13984..38f0a86223e6 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java @@ -28,6 +28,7 @@ public class TaskResponseObject { private final String id; + private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; private final TaskState status; @@ -35,12 +36,14 @@ public class TaskResponseObject @JsonCreator private TaskResponseObject( @JsonProperty("id") String id, + @JsonProperty("type") String type, @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("status") TaskState status ) { this.id = id; + this.type = type; this.createdTime = createdTime; this.queueInsertionTime = queueInsertionTime; this.status = status; @@ -52,6 +55,12 @@ public String getId() return id; } + @SuppressWarnings("unused") // Used by Jackson serialization? + public String getType() + { + return type; + } + @SuppressWarnings("unused") // Used by Jackson serialization? public DateTime getCreatedTime() { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 226824f476f5..121fd7a9bdab 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -160,10 +160,25 @@ private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean w { final Set oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null; + long startSubTaskCount = -1; + final boolean assertRunsSubTasks = taskSpec.contains("index_parallel"); + if (assertRunsSubTasks) { + startSubTaskCount = countCompleteSubTasks(dataSourceName); + } + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); + if (assertRunsSubTasks) { + final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount; + Assert.assertTrue( + StringUtils.format( + "The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?", + taskID + ), newSubTasks > 0); + } + // ITParallelIndexTest does a second round of ingestion to replace segements in an existing // data source. For that second round we need to make sure the coordinator actually learned // about the new segments befor waiting for it to report that all segments are loaded; otherwise @@ -179,4 +194,12 @@ private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean w () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load" ); } + + private long countCompleteSubTasks(final String dataSource) + { + return indexer.getCompleteTasksForDataSource(dataSource) + .stream() + .filter(t -> t.getType().equals("index_sub")) + .count(); + } }