Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -244,29 +245,13 @@ public void sequentialTimeChunkMerging(
throw new ISE("All worker partial key information not received for stage[%d]", stageId.getStageNumber());
}

if (completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().isEmpty()) {
// No time chunks at all: skip fetching.
kernelActions.accept(
kernel -> {
for (final String taskId : tasks) {
final int workerNumber = MSQTasks.workerFromTaskId(taskId);
kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
stageId,
workerNumber,
ClusterByStatisticsSnapshot.empty()
);
}
}
);

return;
}

final Set<String> noBoundaries = new HashSet<>(tasks);
completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk, wks) -> {

for (String taskId : tasks) {
int workerNumber = MSQTasks.workerFromTaskId(taskId);
if (wks.contains(workerNumber)) {
noBoundaries.remove(taskId);
executorService.submit(() -> {
fetchStatsFromWorker(
kernelActions,
Expand All @@ -285,10 +270,24 @@ public void sequentialTimeChunkMerging(
),
retryOperation
);

});
}
}
});

// if the worker did not get any records, update the state of the worker
for (String taskId : noBoundaries) {
kernelActions.accept(
kernel -> {
final int workerNumber = MSQTasks.workerFromTaskId(taskId);
kernel.mergeClusterByStatisticsCollectorForAllTimeChunks(
stageId,
workerNumber,
ClusterByStatisticsSnapshot.empty()
);
});
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,15 @@ public void test_submitFetcherTask_parallelFetch() throws InterruptedException
latch.countDown();
}, stageDefinition.getId(), ImmutableSet.copyOf(TASK_IDS), ((queryKernel, integer, msqFault) -> {}));

latch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, latch.getCount());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));

}

@Test
public void test_submitFetcherTask_sequentialFetch() throws InterruptedException
{
doReturn(true).when(completeKeyStatisticsInformation).isComplete();
final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());

target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, true));

Expand All @@ -143,8 +142,8 @@ public void test_submitFetcherTask_sequentialFetch() throws InterruptedException
((queryKernel, integer, msqFault) -> {})
);

latch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, latch.getCount());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));

}

@Test
Expand Down Expand Up @@ -186,17 +185,15 @@ public void test_inMemoryRetryEnabled_retryInvoked() throws InterruptedException
})
);

latch.await(5, TimeUnit.SECONDS);
retryLatch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, latch.getCount());
Assert.assertEquals(0, retryLatch.getCount());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
}

@Test
public void test_SequentialRetryEnabled_retryInvoked() throws InterruptedException
{
doReturn(true).when(completeKeyStatisticsInformation).isComplete();
final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1);
final CountDownLatch latch = new CountDownLatch(TASK_IDS.size());

target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, true));

Expand All @@ -217,10 +214,8 @@ public void test_SequentialRetryEnabled_retryInvoked() throws InterruptedExcepti
})
);

latch.await(5, TimeUnit.SECONDS);
retryLatch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(0, latch.getCount());
Assert.assertEquals(0, retryLatch.getCount());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,79 @@ public void testMsqIngestionSequentialMerging() throws Exception

msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
}


@Test
public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws Exception
{
String datasource = "dst";

// Clear up the datasource from the previous runs
coordinatorClient.unloadSegmentsForDataSource(datasource);

String queryLocal =
StringUtils.format(
"Replace INTO %s overwrite ALL \n"
+ "SELECT\n"
+ " TIME_PARSE(\"timestamp\") AS __time,\n"
+ " isRobot,\n"
+ " diffUrl,\n"
+ " added,\n"
+ " countryIsoCode,\n"
+ " regionName,\n"
+ " channel,\n"
+ " flags,\n"
+ " delta,\n"
+ " isUnpatrolled,\n"
+ " isNew,\n"
+ " deltaBucket,\n"
+ " isMinor,\n"
+ " isAnonymous,\n"
+ " deleted,\n"
+ " cityName,\n"
+ " metroCode,\n"
+ " namespace,\n"
+ " comment,\n"
+ " page,\n"
+ " commentLength,\n"
+ " countryName,\n"
+ " user,\n"
+ " regionIsoCode\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ " )\n"
+ ")\n"
+ "where delta=111 "
// we add this filter since delta=111 is only present in wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY \"__time\"",
datasource
);

ImmutableMap<String, Object> context = ImmutableMap.of(
MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
ClusterStatisticsMergeMode.SEQUENTIAL,
MultiStageQueryContext.CTX_MAX_NUM_TASKS,
3
);

// Submit the task and wait for the datasource to get loaded
SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, context, null);
SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlQuery);

if (sqlTaskStatus.getState().isFailure()) {
Assert.fail(StringUtils.format(
"Unable to start the task successfully.\nPossible exception: %s",
sqlTaskStatus.getError()

Check notice

Code scanning / CodeQL

Use of default toString()

Default toString(): ErrorResponse inherits toString() from Object, and so is not suitable for printing.
));
}

msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
dataLoaderHelper.waitUntilDatasourceIsReady(datasource);

msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json", datasource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[
{
"query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM %%DATASOURCE%%",
"expectedResults": [
{
"__time": 1377933081000,
"isRobot": "",
"added": 123,
"delta": 111,
"deleted": 12,
"namespace":"article"
}
]
}
]