From 1f247064b0f243168430f73c03cd55751bef2bee Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 21 Aug 2018 17:09:43 -0700 Subject: [PATCH 1/3] Fix timeout in KafkaSupervisorTest.testCheckpointForInactiveTaskGroup --- .../indexing/kafka/supervisor/KafkaSupervisorTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 4d062d82abdf..9341c5d5a9d2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2115,13 +2115,9 @@ public void testCheckpointForInactiveTaskGroup() verifyAll(); - while (serviceEmitter.getStackTrace() != null) { - Thread.sleep(100); - } - - Assert.assertNull(serviceEmitter.getStackTrace()); - Assert.assertNull(serviceEmitter.getExceptionMessage()); - Assert.assertNull(serviceEmitter.getExceptionClass()); + Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace()); + Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage()); + Assert.assertNull(serviceEmitter.getExceptionClass().getCanonicalName(), serviceEmitter.getExceptionClass()); } @Test(timeout = 60_000L) From 366030c73f96c50a70f691e26af6ad1efa26e337 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Aug 2018 12:52:18 -0700 Subject: [PATCH 2/3] fix npe --- .../io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 9341c5d5a9d2..db8e6f8f416e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2117,7 +2117,7 @@ public void testCheckpointForInactiveTaskGroup() Assert.assertNull(serviceEmitter.getStackTrace(), serviceEmitter.getStackTrace()); Assert.assertNull(serviceEmitter.getExceptionMessage(), serviceEmitter.getExceptionMessage()); - Assert.assertNull(serviceEmitter.getExceptionClass().getCanonicalName(), serviceEmitter.getExceptionClass()); + Assert.assertNull(serviceEmitter.getExceptionClass()); } @Test(timeout = 60_000L) From e2687a27ae24a7cbd0da836588d2372630b17fbf Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 23 Aug 2018 14:18:24 -0700 Subject: [PATCH 3/3] add taskRunner.getRunningTasks() --- .../indexing/kafka/supervisor/KafkaSupervisorTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index db8e6f8f416e..8937f6079511 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2061,6 +2061,14 @@ public void testCheckpointForInactiveTaskGroup() null ); + final TaskLocation location1 = new TaskLocation("testHost", 1234, -1); + final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(id1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); + workItems.add(new TestTaskRunnerWorkItem(id2, null, location2)); + + expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes();