Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public List<TaskResponseObject> getPendingTasks()
return getTasks("pendingTasks");
}

public List<TaskResponseObject> getCompleteTasksForDataSource(final String dataSource)
{
return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
}

private List<TaskResponseObject> getTasks(String identifier)
{
try {
Expand Down Expand Up @@ -233,7 +238,14 @@ public void shutdownSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))),
new Request(
HttpMethod.POST,
new URL(StringUtils.format(
"%ssupervisor/%s/shutdown",
getIndexerURL(),
StringUtils.urlEncode(id)
))
),
responseHandler
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ public class TaskResponseObject
{

private final String id;
private final String type;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskState status;

@JsonCreator
private TaskResponseObject(
@JsonProperty("id") String id,
@JsonProperty("type") String type,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("status") TaskState status
)
{
this.id = id;
this.type = type;
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
Expand All @@ -52,6 +55,12 @@ public String getId()
return id;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
public String getType()
{
return type;
}

@SuppressWarnings("unused") // Used by Jackson serialization?
public DateTime getCreatedTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,25 @@ private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean w
{
final Set<String> oldVersions = waitForNewVersion ? coordinator.getSegmentVersions(dataSourceName) : null;

long startSubTaskCount = -1;
final boolean assertRunsSubTasks = taskSpec.contains("index_parallel");
if (assertRunsSubTasks) {
startSubTaskCount = countCompleteSubTasks(dataSourceName);
}

final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);

if (assertRunsSubTasks) {
final long newSubTasks = countCompleteSubTasks(dataSourceName) - startSubTaskCount;
Assert.assertTrue(
StringUtils.format(
"The supervisor task[%s] didn't create any sub tasks. Was it executed in the parallel mode?",
taskID
), newSubTasks > 0);
}

// ITParallelIndexTest does a second round of ingestion to replace segements in an existing
// data source. For that second round we need to make sure the coordinator actually learned
// about the new segments befor waiting for it to report that all segments are loaded; otherwise
Expand All @@ -179,4 +194,12 @@ private void submitTaskAndWait(String taskSpec, String dataSourceName, boolean w
() -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
);
}

private long countCompleteSubTasks(final String dataSource)
{
return indexer.getCompleteTasksForDataSource(dataSource)
.stream()
.filter(t -> t.getType().equals("index_sub"))
.count();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data*"
}
},
"tuningConfig": {
"type": "index_parallel",
"maxNumSubTasks": 10
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
"baseDir": "/resources/data/batch_index",
"filter": "wikipedia_index_data2*"
}
},
"tuningConfig": {
"type": "index_parallel",
"maxNumSubTasks": 10
}
}
}